diff options
Diffstat (limited to 'scripts/scalability/test_controller')
5 files changed, 328 insertions, 0 deletions
diff --git a/scripts/scalability/test_controller/__init__.py b/scripts/scalability/test_controller/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/scripts/scalability/test_controller/__init__.py diff --git a/scripts/scalability/test_controller/requirements.pip b/scripts/scalability/test_controller/requirements.pip new file mode 100644 index 00000000..224b581e --- /dev/null +++ b/scripts/scalability/test_controller/requirements.pip @@ -0,0 +1,2 @@ +psutil +twisted diff --git a/scripts/scalability/test_controller/server/__init__.py b/scripts/scalability/test_controller/server/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/scripts/scalability/test_controller/server/__init__.py diff --git a/scripts/scalability/test_controller/server/server.tac b/scripts/scalability/test_controller/server/server.tac new file mode 100644 index 00000000..c525e3ed --- /dev/null +++ b/scripts/scalability/test_controller/server/server.tac @@ -0,0 +1,238 @@ +#!/usr/bin/env python + +""" +Test Controller Server +====================== + +This script implements a test controller server for scalability tests. It can +be triggered to setup user databases and start and stop monitoring different +kinds of resources (cpu, mem, responsiveness, etc). + +HTTP API +-------- + +The HTTP API is very simple: + + +---------------------+-----------------------------------------------------+ + | POST /setup | setup server databases for scalability test. | + +---------------------+-----------------------------------------------------+ + | POST /cpu?pid=<PID> | start monitoring a process' CPU usage. | + +---------------------|-----------------------------------------------------+ + | GET /cpu | return the cpu percentage used by the process since | + | | last call to PUT. | + +---------------------|-----------------------------------------------------+ + | POST /mem?pid=<PID> | start monitoring a process' memory usage. | + +---------------------|-----------------------------------------------------+ + | GET /mem | return mem usage stats used by the process since | + | | call to PUT. | + +---------------------+-----------------------------------------------------+ + +Environment Variables +--------------------- + +The following environment variables modify the behaviour of the resource +monitor: + + HTTP_PORT - use that port to listen for HTTP requests (default: 7001). +""" + +import json +import os +import psutil + +from twisted.application import service, internet +from twisted.web import resource, server +from twisted.internet.task import LoopingCall +from twisted.logger import Logger + +from test_controller.server.user_dbs import ensure_dbs + + +DEFAULT_HTTP_PORT = 7001 +SUCCESS = json.dumps({'success': True}) + +logger = Logger(__name__) + + +# +# Resource Watcher classes (mem, cpu) +# + +class ResourceWatcher(object): + + def __init__(self, pid): + logger.info('%s started for process with PID %d' + % (self.__class__.__name__, int(pid))) + self.loop_call = LoopingCall(self._loop) + self.process = psutil.Process(pid) + self.data = [] + self.result = None + + @property + def running(self): + return self.loop_call.running + + def _loop(self): + pass + + def start(self): + self._start() + d = self.loop_call.start(self.interval) + d.addCallback(self._stop) + return d + + def _start(self): + pass + + def _stop(self, _): + raise NotImplementedError + + def stop(self): + self.loop_call.stop() + + +class CpuWatcher(ResourceWatcher): + + interval = 1 + + def _start(self): + self.process.cpu_percent() + + def _stop(self, _): + self.result = {'cpu_percent': self.process.cpu_percent()} + + +def _mean(l): + return float(sum(l)) / len(l) + + +def _std(l): + if len(l) <= 1: + return 0 + mean = _mean(l) + squares = [(x - mean) ** 2 for x in l] + return (sum(squares) / (len(l) - 1)) ** 0.5 + + +class MemoryWatcher(ResourceWatcher): + + interval = 0.1 + + def _loop(self): + sample = self.process.memory_percent(memtype='rss') + self.data.append(sample) + + def _stop(self, _): + stats = { + 'max': max(self.data), + 'min': min(self.data), + 'mean': _mean(self.data), + 'std': _std(self.data), + } + self.result = { + 'interval': self.interval, + 'samples': self.data, + 'memory_percent': stats, + } + + +# +# Resources for use with "twistd web" +# + +class MissingPidError(Exception): + + def __str__(self): + return "No PID was passed in request" + + +class InvalidPidError(Exception): + + def __init__(self, pid): + Exception.__init__(self) + self.pid = pid + + def __str__(self): + return "Invalid PID: %r" % self.pid + + +class MonitorResource(resource.Resource): + """ + A generic resource-monitor web resource. + """ + + isLeaf = 1 + + def __init__(self, watcherClass): + resource.Resource.__init__(self) + self.watcherClass = watcherClass + self.watcher = None + + def _get_pid(self, request): + if 'pid' not in request.args: + raise MissingPidError() + pid = request.args['pid'].pop() + if not pid.isdigit(): + raise InvalidPidError(pid) + return int(pid) + + def render_POST(self, request): + try: + pid = self._get_pid(request) + except Exception as e: + request.setResponseCode(500) + logger.error('Error processing request: %r' % e) + return json.dumps({'error': str(e)}) + self._stop_watcher() + try: + self.watcher = self.watcherClass(pid) + self.watcher.start() + return SUCCESS + except psutil.NoSuchProcess as e: + request.setResponseCode(404) + return json.dumps({'error': str(e)}) + + def render_GET(self, request): + self._stop_watcher() + if self.watcher: + return json.dumps(self.watcher.result) + return json.dumps({}) + + def _stop_watcher(self): + if self.watcher and self.watcher.running: + self.watcher.stop() + + +class SetupResource(resource.Resource): + + def render_POST(self, request): + d = ensure_dbs() + d.addCallback(self._success, request) + d.addErrback(self._error, request) + return server.NOT_DONE_YET + + def _success(self, _, request): + request.write(SUCCESS) + request.finish() + + def _error(self, e, request): + logger.error('Error processing request: %s' % e.getErrorMessage()) + request.setResponseCode(500) + request.write(json.dumps({'error': str(e)})) + request.finish() + + +class Root(resource.Resource): + + def __init__(self): + resource.Resource.__init__(self) + self.putChild('mem', MonitorResource(MemoryWatcher)) + self.putChild('cpu', MonitorResource(CpuWatcher)) + self.putChild('setup', SetupResource()) + + +application = service.Application("Resource Monitor") +site = server.Site(Root()) +port = os.environ.get('HTTP_PORT', DEFAULT_HTTP_PORT) +service = internet.TCPServer(port, site) +service.setServiceParent(application) diff --git a/scripts/scalability/test_controller/server/user_dbs.py b/scripts/scalability/test_controller/server/user_dbs.py new file mode 100755 index 00000000..a1a9c222 --- /dev/null +++ b/scripts/scalability/test_controller/server/user_dbs.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python + +# Handle creation of user databases for scalability tests. + +import argparse +import treq + +from functools import partial +from urlparse import urljoin + +from twisted.internet import reactor, defer +from twisted.logger import Logger + +COUCH_URL = "http://127.0.0.1:5984" +CREATE = 1000 + + +logger = Logger() + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument('--couch-url', default=COUCH_URL, + help='The URL to the CouchDB server.') + parser.add_argument('--create', default=CREATE, + help='The number of databases to create.') + return parser.parse_args() + + +def get_db_names(create): + dbs = [] + for i in xrange(create): + dbname = 'user-%d' % i + dbs.append(dbname) + return dbs + + +semaphore = defer.DeferredSemaphore(20) + + +def _log(db, action, res): + logger.info('table %s %s' % (db, action)) + return res + + +@defer.inlineCallbacks +def delete_dbs(dbs): + deferreds = [] + for db in dbs: + d = semaphore.run(treq.delete, urljoin(COUCH_URL, db)) + logfun = partial(_log, db, 'deleted') + d.addCallback(logfun) + deferreds.append(d) + responses = yield defer.gatherResults(deferreds) + codes = map(lambda r: r.code, responses) + assert all(map(lambda c: c == 200 or c == 404, codes)) + + +@defer.inlineCallbacks +def create_dbs(dbs): + deferreds = [] + for db in dbs: + d = semaphore.run(treq.put, urljoin(COUCH_URL, db)) + logfun = partial(_log, db, 'created') + d.addCallback(logfun) + deferreds.append(d) + responses = yield defer.gatherResults(deferreds) + codes = map(lambda r: r.code, responses) + assert all(map(lambda c: c == 201, codes)) + + +@defer.inlineCallbacks +def ensure_dbs(couch_url=COUCH_URL, create=CREATE): + dbs = get_db_names(create) + yield delete_dbs(dbs) + yield create_dbs(dbs) + + +@defer.inlineCallbacks +def main(couch_url, create): + yield ensure_dbs(couch_url, create) + reactor.stop() + + +if __name__ == '__main__': + args = parse_args() + d = main(args.couch_url, args.create) + reactor.run() |