[feat] defer decrypt, gen_key and encrypt
authorVictor Shyba <victor.shyba@gmail.com>
Thu, 11 Feb 2016 22:22:34 +0000 (19:22 -0300)
committerVictor Shyba <victor.shyba@gmail.com>
Tue, 23 Feb 2016 17:37:27 +0000 (14:37 -0300)
This commit put those gnupg operations to be run on external threads
limited by the amount of cores present on user machine.
Some gnupg calls spawn processes and communicating to them is a
synchronous operation, so running outside of a reactor should improve
response time by avoiding reactor locking.

changes/async_gpg [new file with mode: 0644]
src/leap/keymanager/__init__.py
src/leap/keymanager/openpgp.py
src/leap/keymanager/tests/test_openpgp.py

diff --git a/changes/async_gpg b/changes/async_gpg
new file mode 100644 (file)
index 0000000..59d4d41
--- /dev/null
@@ -0,0 +1 @@
+-- Defer encrypt, decrypt and gen_key operations from gnupg to external threads, limited by cpu core amount.
index 7e4d30e..9aa7139 100644 (file)
@@ -572,15 +572,15 @@ class KeyManager(object):
         self._assert_supported_key_type(ktype)
         _keys = self._wrapper_map[ktype]
 
+        @defer.inlineCallbacks
         def encrypt(keys):
             pubkey, signkey = keys
-            encrypted = _keys.encrypt(
+            encrypted = yield _keys.encrypt(
                 data, pubkey, passphrase, sign=signkey,
                 cipher_algo=cipher_algo)
             pubkey.encr_used = True
-            d = _keys.put_key(pubkey, address)
-            d.addCallback(lambda _: encrypted)
-            return d
+            yield _keys.put_key(pubkey, address)
+            defer.returnValue(encrypted)
 
         dpub = self.get_key(address, ktype, private=False,
                             fetch_remote=fetch_remote)
@@ -625,9 +625,10 @@ class KeyManager(object):
         self._assert_supported_key_type(ktype)
         _keys = self._wrapper_map[ktype]
 
+        @defer.inlineCallbacks
         def decrypt(keys):
             pubkey, privkey = keys
-            decrypted, signed = _keys.decrypt(
+            decrypted, signed = yield _keys.decrypt(
                 data, privkey, passphrase=passphrase, verify=pubkey)
             if pubkey is None:
                 signature = KeyNotFound(verify)
@@ -635,14 +636,13 @@ class KeyManager(object):
                 signature = pubkey
                 if not pubkey.sign_used:
                     pubkey.sign_used = True
-                    d = _keys.put_key(pubkey, verify)
-                    d.addCallback(lambda _: (decrypted, signature))
-                    return d
+                    yield _keys.put_key(pubkey, verify)
+                    defer.returnValue((decrypted, signature))
             else:
                 signature = InvalidSignature(
                     'Failed to verify signature with key %s' %
                     (pubkey.key_id,))
-            return (decrypted, signature)
+            defer.returnValue((decrypted, signature))
 
         dpriv = self.get_key(address, ktype, private=True)
         dpub = defer.succeed(None)
index d648137..9064043 100644 (file)
@@ -27,9 +27,11 @@ import io
 
 
 from datetime import datetime
+from multiprocessing import cpu_count
 from gnupg import GPG
 from gnupg.gnupg import GPGUtilities
 from twisted.internet import defer
+from twisted.internet.threads import deferToThread
 
 from leap.common.check import leap_assert, leap_assert_type, leap_check
 from leap.keymanager import errors
@@ -55,6 +57,16 @@ logger = logging.getLogger(__name__)
 # A temporary GPG keyring wrapped to provide OpenPGP functionality.
 #
 
+# This function will be used to call blocking GPG functions outside
+# of Twisted reactor and match the concurrent calls to the amount of CPU cores
+cpu_core_semaphore = defer.DeferredSemaphore(cpu_count())
+
+
+def from_thread(func, *args, **kwargs):
+    call = lambda: deferToThread(func, *args, **kwargs)
+    return cpu_core_semaphore.run(call)
+
+
 class TempGPGWrapper(object):
     """
     A context manager that wraps a temporary GPG keyring which only contains
@@ -253,6 +265,7 @@ class OpenPGPScheme(EncryptionScheme):
         # make sure the key does not already exist
         leap_assert(is_address(address), 'Not an user address: %s' % address)
 
+        @defer.inlineCallbacks
         def _gen_key(_):
             with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
                 # TODO: inspect result, or use decorator
@@ -264,7 +277,7 @@ class OpenPGPScheme(EncryptionScheme):
                     name_comment='')
                 logger.info("About to generate keys... "
                             "This might take SOME time.")
-                gpg.gen_key(params)
+                yield from_thread(gpg.gen_key, params)
                 logger.info("Keys for %s have been successfully "
                             "generated." % (address,))
                 pubkeys = gpg.list_keys()
@@ -293,7 +306,7 @@ class OpenPGPScheme(EncryptionScheme):
                         gpg.export_keys(key['fingerprint'], secret=secret))
                     d = self.put_key(openpgp_key, address)
                     deferreds.append(d)
-                return defer.gatherResults(deferreds)
+                yield defer.gatherResults(deferreds)
 
         def key_already_exists(_):
             raise errors.KeyAlreadyExists(address)
@@ -686,6 +699,7 @@ class OpenPGPScheme(EncryptionScheme):
             raise errors.GPGError(
                 'Failed to encrypt/decrypt: %s' % stderr)
 
+    @defer.inlineCallbacks
     def encrypt(self, data, pubkey, passphrase=None, sign=None,
                 cipher_algo='AES256'):
         """
@@ -700,8 +714,8 @@ class OpenPGPScheme(EncryptionScheme):
         :param cipher_algo: The cipher algorithm to use.
         :type cipher_algo: str
 
-        :return: The encrypted data.
-        :rtype: str
+        :return: A Deferred that will be fired with the encrypted data.
+        :rtype: defer.Deferred
 
         :raise EncryptError: Raised if failed encrypting for some reason.
         """
@@ -713,7 +727,8 @@ class OpenPGPScheme(EncryptionScheme):
             leap_assert(sign.private is True)
             keys.append(sign)
         with TempGPGWrapper(keys, self._gpgbinary) as gpg:
-            result = gpg.encrypt(
+            result = yield from_thread(
+                gpg.encrypt,
                 data, pubkey.fingerprint,
                 default_key=sign.key_id if sign else None,
                 passphrase=passphrase, symmetric=False,
@@ -724,11 +739,12 @@ class OpenPGPScheme(EncryptionScheme):
             # result.data  - (bool) contains the result of the operation
             try:
                 self._assert_gpg_result_ok(result)
-                return result.data
+                defer.returnValue(result.data)
             except errors.GPGError as e:
                 logger.error('Failed to decrypt: %s.' % str(e))
                 raise errors.EncryptError()
 
+    @defer.inlineCallbacks
     def decrypt(self, data, privkey, passphrase=None, verify=None):
         """
         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
@@ -743,8 +759,9 @@ class OpenPGPScheme(EncryptionScheme):
         :param verify: The key used to verify a signature.
         :type verify: OpenPGPKey
 
-        :return: The decrypted data and if signature verifies
-        :rtype: (unicode, bool)
+        :return: Deferred that will fire with the decrypted data and
+                 if signature verifies (unicode, bool)
+        :rtype: Deferred
 
         :raise DecryptError: Raised if failed decrypting for some reason.
         """
@@ -756,8 +773,9 @@ class OpenPGPScheme(EncryptionScheme):
             keys.append(verify)
         with TempGPGWrapper(keys, self._gpgbinary) as gpg:
             try:
-                result = gpg.decrypt(
-                    data, passphrase=passphrase, always_trust=True)
+                result = yield from_thread(gpg.decrypt,
+                                           data, passphrase=passphrase,
+                                           always_trust=True)
                 self._assert_gpg_result_ok(result)
 
                 # verify signature
@@ -767,7 +785,7 @@ class OpenPGPScheme(EncryptionScheme):
                         verify.fingerprint == result.pubkey_fingerprint):
                     sign_valid = True
 
-                return (result.data, sign_valid)
+                defer.returnValue((result.data, sign_valid))
             except errors.GPGError as e:
                 logger.error('Failed to decrypt: %s.' % str(e))
                 raise errors.DecryptError(str(e))
index bae83db..96b40a0 100644 (file)
@@ -109,7 +109,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         # encrypt
         yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
         pubkey = yield pgp.get_key(ADDRESS, private=False)
-        cyphertext = pgp.encrypt(data, pubkey)
+        cyphertext = yield pgp.encrypt(data, pubkey)
 
         self.assertTrue(cyphertext is not None)
         self.assertTrue(cyphertext != '')
@@ -121,7 +121,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         yield self._assert_key_not_found(pgp, ADDRESS, private=True)
         yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         privkey = yield pgp.get_key(ADDRESS, private=True)
-        decrypted, _ = pgp.decrypt(cyphertext, privkey)
+        decrypted, _ = yield pgp.decrypt(cyphertext, privkey)
         self.assertEqual(decrypted, data)
 
         yield pgp.delete_key(pubkey)
@@ -171,9 +171,9 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         privkey = yield pgp.get_key(ADDRESS, private=True)
         pubkey = yield pgp.get_key(ADDRESS, private=False)
-        self.assertRaises(
-            AssertionError,
-            pgp.encrypt, data, privkey, sign=pubkey)
+        self.failureResultOf(
+            pgp.encrypt(data, privkey, sign=pubkey),
+            AssertionError)
 
     @inlineCallbacks
     def test_decrypt_verify_with_private_raises(self):
@@ -183,12 +183,11 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         privkey = yield pgp.get_key(ADDRESS, private=True)
         pubkey = yield pgp.get_key(ADDRESS, private=False)
-        encrypted_and_signed = pgp.encrypt(
+        encrypted_and_signed = yield pgp.encrypt(
             data, pubkey, sign=privkey)
-        self.assertRaises(
-            AssertionError,
-            pgp.decrypt,
-            encrypted_and_signed, privkey, verify=privkey)
+        self.failureResultOf(
+            pgp.decrypt(encrypted_and_signed, privkey, verify=privkey),
+            AssertionError)
 
     @inlineCallbacks
     def test_decrypt_verify_with_wrong_key(self):
@@ -198,11 +197,12 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         privkey = yield pgp.get_key(ADDRESS, private=True)
         pubkey = yield pgp.get_key(ADDRESS, private=False)
-        encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey)
+        encrypted_and_signed = yield pgp.encrypt(data, pubkey, sign=privkey)
         yield pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2)
         wrongkey = yield pgp.get_key(ADDRESS_2)
-        decrypted, validsign = pgp.decrypt(encrypted_and_signed, privkey,
-                                           verify=wrongkey)
+        decrypted, validsign = yield pgp.decrypt(encrypted_and_signed,
+                                                 privkey,
+                                                 verify=wrongkey)
         self.assertEqual(decrypted, data)
         self.assertFalse(validsign)
 
@@ -232,9 +232,9 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         privkey2 = yield pgp.get_key(ADDRESS_2, private=True)
 
         data = 'data'
-        encrypted_and_signed = pgp.encrypt(
+        encrypted_and_signed = yield pgp.encrypt(
             data, pubkey2, sign=privkey)
-        res, validsign = pgp.decrypt(
+        res, validsign = yield pgp.decrypt(
             encrypted_and_signed, privkey2, verify=pubkey)
         self.assertEqual(data, res)
         self.assertTrue(validsign)