summaryrefslogtreecommitdiff
path: root/scripts/scalability/test_controller
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/scalability/test_controller')
-rw-r--r--scripts/scalability/test_controller/__init__.py0
-rw-r--r--scripts/scalability/test_controller/requirements.pip2
-rw-r--r--scripts/scalability/test_controller/server/__init__.py0
-rw-r--r--scripts/scalability/test_controller/server/server.tac238
-rwxr-xr-xscripts/scalability/test_controller/server/user_dbs.py88
5 files changed, 328 insertions, 0 deletions
diff --git a/scripts/scalability/test_controller/__init__.py b/scripts/scalability/test_controller/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/scripts/scalability/test_controller/__init__.py
diff --git a/scripts/scalability/test_controller/requirements.pip b/scripts/scalability/test_controller/requirements.pip
new file mode 100644
index 00000000..224b581e
--- /dev/null
+++ b/scripts/scalability/test_controller/requirements.pip
@@ -0,0 +1,2 @@
+psutil
+twisted
diff --git a/scripts/scalability/test_controller/server/__init__.py b/scripts/scalability/test_controller/server/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/scripts/scalability/test_controller/server/__init__.py
diff --git a/scripts/scalability/test_controller/server/server.tac b/scripts/scalability/test_controller/server/server.tac
new file mode 100644
index 00000000..c525e3ed
--- /dev/null
+++ b/scripts/scalability/test_controller/server/server.tac
@@ -0,0 +1,238 @@
+#!/usr/bin/env python
+
+"""
+Test Controller Server
+======================
+
+This script implements a test controller server for scalability tests. It can
+be triggered to setup user databases and start and stop monitoring different
+kinds of resources (cpu, mem, responsiveness, etc).
+
+HTTP API
+--------
+
+The HTTP API is very simple:
+
+ +---------------------+-----------------------------------------------------+
+ | POST /setup | setup server databases for scalability test. |
+ +---------------------+-----------------------------------------------------+
+ | POST /cpu?pid=<PID> | start monitoring a process' CPU usage. |
+ +---------------------|-----------------------------------------------------+
+ | GET /cpu | return the cpu percentage used by the process since |
+ | | last call to PUT. |
+ +---------------------|-----------------------------------------------------+
+ | POST /mem?pid=<PID> | start monitoring a process' memory usage. |
+ +---------------------|-----------------------------------------------------+
+ | GET /mem | return mem usage stats used by the process since |
+ | | call to PUT. |
+ +---------------------+-----------------------------------------------------+
+
+Environment Variables
+---------------------
+
+The following environment variables modify the behaviour of the resource
+monitor:
+
+ HTTP_PORT - use that port to listen for HTTP requests (default: 7001).
+"""
+
+import json
+import os
+import psutil
+
+from twisted.application import service, internet
+from twisted.web import resource, server
+from twisted.internet.task import LoopingCall
+from twisted.logger import Logger
+
+from test_controller.server.user_dbs import ensure_dbs
+
+
+DEFAULT_HTTP_PORT = 7001
+SUCCESS = json.dumps({'success': True})
+
+logger = Logger(__name__)
+
+
+#
+# Resource Watcher classes (mem, cpu)
+#
+
+class ResourceWatcher(object):
+
+ def __init__(self, pid):
+ logger.info('%s started for process with PID %d'
+ % (self.__class__.__name__, int(pid)))
+ self.loop_call = LoopingCall(self._loop)
+ self.process = psutil.Process(pid)
+ self.data = []
+ self.result = None
+
+ @property
+ def running(self):
+ return self.loop_call.running
+
+ def _loop(self):
+ pass
+
+ def start(self):
+ self._start()
+ d = self.loop_call.start(self.interval)
+ d.addCallback(self._stop)
+ return d
+
+ def _start(self):
+ pass
+
+ def _stop(self, _):
+ raise NotImplementedError
+
+ def stop(self):
+ self.loop_call.stop()
+
+
+class CpuWatcher(ResourceWatcher):
+
+ interval = 1
+
+ def _start(self):
+ self.process.cpu_percent()
+
+ def _stop(self, _):
+ self.result = {'cpu_percent': self.process.cpu_percent()}
+
+
+def _mean(l):
+ return float(sum(l)) / len(l)
+
+
+def _std(l):
+ if len(l) <= 1:
+ return 0
+ mean = _mean(l)
+ squares = [(x - mean) ** 2 for x in l]
+ return (sum(squares) / (len(l) - 1)) ** 0.5
+
+
+class MemoryWatcher(ResourceWatcher):
+
+ interval = 0.1
+
+ def _loop(self):
+ sample = self.process.memory_percent(memtype='rss')
+ self.data.append(sample)
+
+ def _stop(self, _):
+ stats = {
+ 'max': max(self.data),
+ 'min': min(self.data),
+ 'mean': _mean(self.data),
+ 'std': _std(self.data),
+ }
+ self.result = {
+ 'interval': self.interval,
+ 'samples': self.data,
+ 'memory_percent': stats,
+ }
+
+
+#
+# Resources for use with "twistd web"
+#
+
+class MissingPidError(Exception):
+
+ def __str__(self):
+ return "No PID was passed in request"
+
+
+class InvalidPidError(Exception):
+
+ def __init__(self, pid):
+ Exception.__init__(self)
+ self.pid = pid
+
+ def __str__(self):
+ return "Invalid PID: %r" % self.pid
+
+
+class MonitorResource(resource.Resource):
+ """
+ A generic resource-monitor web resource.
+ """
+
+ isLeaf = 1
+
+ def __init__(self, watcherClass):
+ resource.Resource.__init__(self)
+ self.watcherClass = watcherClass
+ self.watcher = None
+
+ def _get_pid(self, request):
+ if 'pid' not in request.args:
+ raise MissingPidError()
+ pid = request.args['pid'].pop()
+ if not pid.isdigit():
+ raise InvalidPidError(pid)
+ return int(pid)
+
+ def render_POST(self, request):
+ try:
+ pid = self._get_pid(request)
+ except Exception as e:
+ request.setResponseCode(500)
+ logger.error('Error processing request: %r' % e)
+ return json.dumps({'error': str(e)})
+ self._stop_watcher()
+ try:
+ self.watcher = self.watcherClass(pid)
+ self.watcher.start()
+ return SUCCESS
+ except psutil.NoSuchProcess as e:
+ request.setResponseCode(404)
+ return json.dumps({'error': str(e)})
+
+ def render_GET(self, request):
+ self._stop_watcher()
+ if self.watcher:
+ return json.dumps(self.watcher.result)
+ return json.dumps({})
+
+ def _stop_watcher(self):
+ if self.watcher and self.watcher.running:
+ self.watcher.stop()
+
+
+class SetupResource(resource.Resource):
+
+ def render_POST(self, request):
+ d = ensure_dbs()
+ d.addCallback(self._success, request)
+ d.addErrback(self._error, request)
+ return server.NOT_DONE_YET
+
+ def _success(self, _, request):
+ request.write(SUCCESS)
+ request.finish()
+
+ def _error(self, e, request):
+ logger.error('Error processing request: %s' % e.getErrorMessage())
+ request.setResponseCode(500)
+ request.write(json.dumps({'error': str(e)}))
+ request.finish()
+
+
+class Root(resource.Resource):
+
+ def __init__(self):
+ resource.Resource.__init__(self)
+ self.putChild('mem', MonitorResource(MemoryWatcher))
+ self.putChild('cpu', MonitorResource(CpuWatcher))
+ self.putChild('setup', SetupResource())
+
+
+application = service.Application("Resource Monitor")
+site = server.Site(Root())
+port = os.environ.get('HTTP_PORT', DEFAULT_HTTP_PORT)
+service = internet.TCPServer(port, site)
+service.setServiceParent(application)
diff --git a/scripts/scalability/test_controller/server/user_dbs.py b/scripts/scalability/test_controller/server/user_dbs.py
new file mode 100755
index 00000000..a1a9c222
--- /dev/null
+++ b/scripts/scalability/test_controller/server/user_dbs.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+
+# Handle creation of user databases for scalability tests.
+
+import argparse
+import treq
+
+from functools import partial
+from urlparse import urljoin
+
+from twisted.internet import reactor, defer
+from twisted.logger import Logger
+
+COUCH_URL = "http://127.0.0.1:5984"
+CREATE = 1000
+
+
+logger = Logger()
+
+
+def parse_args():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--couch-url', default=COUCH_URL,
+ help='The URL to the CouchDB server.')
+ parser.add_argument('--create', default=CREATE,
+ help='The number of databases to create.')
+ return parser.parse_args()
+
+
+def get_db_names(create):
+ dbs = []
+ for i in xrange(create):
+ dbname = 'user-%d' % i
+ dbs.append(dbname)
+ return dbs
+
+
+semaphore = defer.DeferredSemaphore(20)
+
+
+def _log(db, action, res):
+ logger.info('table %s %s' % (db, action))
+ return res
+
+
+@defer.inlineCallbacks
+def delete_dbs(dbs):
+ deferreds = []
+ for db in dbs:
+ d = semaphore.run(treq.delete, urljoin(COUCH_URL, db))
+ logfun = partial(_log, db, 'deleted')
+ d.addCallback(logfun)
+ deferreds.append(d)
+ responses = yield defer.gatherResults(deferreds)
+ codes = map(lambda r: r.code, responses)
+ assert all(map(lambda c: c == 200 or c == 404, codes))
+
+
+@defer.inlineCallbacks
+def create_dbs(dbs):
+ deferreds = []
+ for db in dbs:
+ d = semaphore.run(treq.put, urljoin(COUCH_URL, db))
+ logfun = partial(_log, db, 'created')
+ d.addCallback(logfun)
+ deferreds.append(d)
+ responses = yield defer.gatherResults(deferreds)
+ codes = map(lambda r: r.code, responses)
+ assert all(map(lambda c: c == 201, codes))
+
+
+@defer.inlineCallbacks
+def ensure_dbs(couch_url=COUCH_URL, create=CREATE):
+ dbs = get_db_names(create)
+ yield delete_dbs(dbs)
+ yield create_dbs(dbs)
+
+
+@defer.inlineCallbacks
+def main(couch_url, create):
+ yield ensure_dbs(couch_url, create)
+ reactor.stop()
+
+
+if __name__ == '__main__':
+ args = parse_args()
+ d = main(args.couch_url, args.create)
+ reactor.run()