[feat] defer blocking requests calls to thread
authorVictor Shyba <victor.shyba@gmail.com>
Wed, 27 Jan 2016 23:45:00 +0000 (20:45 -0300)
committerVictor Shyba <victor.shyba@gmail.com>
Wed, 27 Jan 2016 23:45:00 +0000 (20:45 -0300)
That's a temporary fix for #6506
This commit adapts code to deal with deferreds coming from calling
requests from Twisted. Next step is just to change requests for twisted
http client present in leap.common.
Unfortunately, this last step will be a bit longer and would be better
to have integrations tests to ensure current HTTP behaviour.

src/leap/keymanager/__init__.py

index c7886e0..06128c0 100644 (file)
@@ -58,6 +58,7 @@ import logging
 import requests
 
 from twisted.internet import defer
+from twisted.internet import threads
 from urlparse import urlparse
 
 from leap.common.check import leap_assert
@@ -185,6 +186,7 @@ class KeyManager(object):
             lambda klass: klass.__name__ == ktype,
             self._wrapper_map).pop()
 
+    @defer.inlineCallbacks
     def _get(self, uri, data=None):
         """
         Send a GET request to C{uri} containing C{data}.
@@ -200,7 +202,8 @@ class KeyManager(object):
         leap_assert(
             self._ca_cert_path is not None,
             'We need the CA certificate path!')
-        res = self._fetcher.get(uri, data=data, verify=self._ca_cert_path)
+        res = yield threads.deferToThread(self._fetcher.get, uri, data=data,
+                                          verify=self._ca_cert_path)
         # Nickserver now returns 404 for key not found and 500 for
         # other cases (like key too small), so we are skipping this
         # check for the time being
@@ -211,7 +214,7 @@ class KeyManager(object):
         # leap_assert(
         #     res.headers['content-type'].startswith('application/json'),
         #     'Content-type is not JSON.')
-        return res
+        defer.returnValue(res)
 
     def _get_with_combined_ca_bundle(self, uri, data=None):
         """
@@ -228,9 +231,11 @@ class KeyManager(object):
         :return: The response to the request.
         :rtype: requests.Response
         """
-        return self._fetcher.get(
-            uri, data=data, verify=self._combined_ca_bundle)
+        return threads.deferToThread(self._fetcher.get,
+                                     uri, data=data,
+                                     verify=self._combined_ca_bundle)
 
+    @defer.inlineCallbacks
     def _put(self, uri, data=None):
         """
         Send a PUT request to C{uri} containing C{data}.
@@ -253,14 +258,17 @@ class KeyManager(object):
         leap_assert(
             self._token is not None,
             'We need a token to interact with webapp!')
-        res = self._fetcher.put(
-            uri, data=data, verify=self._ca_cert_path,
-            headers={'Authorization': 'Token token=%s' % self._token})
+        headers = {'Authorization': 'Token token=%s' % self._token}
+        res = yield threads.deferToThread(self._fetcher.put,
+                                          uri, data=data,
+                                          verify=self._ca_cert_path,
+                                          headers=headers)
         # assert that the response is valid
         res.raise_for_status()
-        return res
+        defer.returnValue(res)
 
     @memoized_method(invalidation=300)
+    @defer.inlineCallbacks
     def _fetch_keys_from_server(self, address):
         """
         Fetch keys bound to address from nickserver and insert them in
@@ -279,7 +287,7 @@ class KeyManager(object):
         d = defer.succeed(None)
         res = None
         try:
-            res = self._get(self._nickserver_uri, {'address': address})
+            res = yield self._get(self._nickserver_uri, {'address': address})
             res.raise_for_status()
             server_keys = res.json()
 
@@ -307,7 +315,7 @@ class KeyManager(object):
         except Exception as e:
             d = defer.fail(KeyNotFound(e.message))
             logger.warning("Error retrieving key: %r" % (e,))
-        return d
+        yield d
 
     #
     # key management
@@ -339,8 +347,9 @@ class KeyManager(object):
                 self._api_uri,
                 self._api_version,
                 self._uid)
-            self._put(uri, data)
+            d = self._put(uri, data)
             emit_async(catalog.KEYMANAGER_DONE_UPLOADING_KEYS, self._address)
+            return d
 
         d = self.get_key(
             self._address, ktype, private=False, fetch_remote=False)
@@ -822,6 +831,7 @@ class KeyManager(object):
             d.addCallback(lambda _: self.put_key(privkey, address))
         return d
 
+    @defer.inlineCallbacks
     def fetch_key(self, address, uri, ktype,
                   validation=ValidationLevels.Weak_Chain):
         """
@@ -852,20 +862,20 @@ class KeyManager(object):
 
         logger.info("Fetch key for %s from %s" % (address, uri))
         try:
-            res = self._get_with_combined_ca_bundle(uri)
+            res = yield self._get_with_combined_ca_bundle(uri)
         except Exception as e:
             logger.warning("There was a problem fetching key: %s" % (e,))
-            return defer.fail(KeyNotFound(uri))
+            raise KeyNotFound(uri)
         if not res.ok:
-            return defer.fail(KeyNotFound(uri))
+            raise KeyNotFound(uri)
 
         # XXX parse binary keys
         pubkey, _ = _keys.parse_ascii_key(res.content)
         if pubkey is None:
-            return defer.fail(KeyNotFound(uri))
+            raise KeyNotFound(uri)
 
         pubkey.validation = validation
-        return self.put_key(pubkey, address)
+        yield self.put_key(pubkey, address)
 
     def _assert_supported_key_type(self, ktype):
         """