Always use the same client agent in multiprocessing test.
authordrebs <drebs@riseup.net>
Sat, 6 Jan 2018 10:00:31 +0000 (08:00 -0200)
committerdrebs <drebs@riseup.net>
Sat, 6 Jan 2018 10:00:31 +0000 (08:00 -0200)
blobs-multiprocess/request.py

index b73fb79..9f35ca0 100755 (executable)
@@ -5,7 +5,7 @@ import time
 import treq
 from argparse import ArgumentParser
 from twisted.internet import reactor, task, defer
-from twisted.web.client import readBody, HTTPConnectionPool
+from twisted.web.client import readBody, HTTPConnectionPool, Agent
 from urlparse import urljoin
 from uuid import uuid4
 
@@ -15,7 +15,14 @@ BLOBS_URI = urljoin(BASE_URI,
                     'blobs/{}/'.format(time.strftime('%Y-%m-%d_%H-%M-%s')))
 CONCURRENT = 10
 
-pool = HTTPConnectionPool(reactor)
+
+def get_client():
+    pool = HTTPConnectionPool(reactor)
+    agent = Agent(reactor, pool=pool)
+    return treq.client.HTTPClient(agent)
+
+
+_client = get_client()
 
 
 def parse_args():
@@ -38,8 +45,9 @@ def parse_args():
     return args
 
 
-def _finished(_, amount, size):
-    print("Finished putting {} blobs of size {}K.".format(amount, size))
+def _finished(_, start):
+    end = time.time()
+    print(end - start)
     reactor.stop()
 
 
@@ -51,7 +59,10 @@ def _error(failure):
 def main(generator):
     cooperator = task.Cooperator()
     cooptask = cooperator.cooperate(generator)
+    start = time.time()
     d = cooptask.whenDone()
+    d.addCallback(_finished, start)
+    d.addErrback(_error)
     return d
 
 
@@ -59,47 +70,51 @@ def requests_generator(args):
     data = "a" * args.size * 1024
 
     def _get(_, uri):
-        d = treq.get(uri, pool=pool)
+        d = _client.get(uri)
         d.addCallback(lambda response: readBody(response))
         return d
 
     def _flag(_, uri):
         flags = BytesIO('["PROCESSING"]')
-        d = treq.post(uri, data=flags, pool=pool)
+        d = _client.post(uri, data=flags)
         return d
 
     def _delete(_, uri):
-        d = treq.delete(uri, pool=pool)
+        d = _client.delete(uri)
         return d
 
     deferreds = []
     for i in xrange(args.amount):
         if args.baseline:
-            d = treq.get(BASE_URI, pool=pool)
+            import pdb
+            pdb.set_trace()
+            d = _client.get(BASE_URI)
 
         elif args.list:
-            d = treq.get(BLOBS_URI, pool=pool)
+            d = _client.get(BLOBS_URI)
 
         else:
             uri = urljoin(BLOBS_URI, uuid4().hex)
-            d = treq.put(uri, data=data, pool=pool)
+            d = _client.put(uri, data=data)
+
             if args.get:
                 d.addCallback(_get, uri)
+
             if args.flag:
                 d.addCallback(_flag, uri)
+
             if args.delete:
                 d.addCallback(_delete, uri)
 
         deferreds.append(d)
         yield None
 
-    yield defer.gatherResults(deferreds)
+    d = defer.gatherResults(deferreds)
+    yield d
 
 
 if __name__ == "__main__":
     args = parse_args()
     generator = requests_generator(args)
     d = main(generator)
-    d.addCallback(_finished, args.amount, args.size)
-    d.addErrback(_error)
     reactor.run()