From 7d5c3dcd969161322deed6c43f8a6a3cb92c3369 Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Tue, 11 Nov 2014 11:53:55 -0500 Subject: upgrade to 14.4.1 --- examples/logger/zmqlogger.py | 70 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 examples/logger/zmqlogger.py (limited to 'examples/logger') diff --git a/examples/logger/zmqlogger.py b/examples/logger/zmqlogger.py new file mode 100644 index 0000000..c55b51b --- /dev/null +++ b/examples/logger/zmqlogger.py @@ -0,0 +1,70 @@ +""" +Simple example of using zmq log handlers + +This starts a number of subprocesses with PUBHandlers that generate +log messages at a regular interval. The main process has a SUB socket, +which aggregates and logs all of the messages to the root logger. +""" + +import logging +from multiprocessing import Process +import os +import random +import sys +import time + +import zmq +from zmq.log.handlers import PUBHandler + +LOG_LEVELS = (logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR, logging.CRITICAL) + +def sub_logger(port, level=logging.DEBUG): + ctx = zmq.Context() + sub = ctx.socket(zmq.SUB) + sub.bind('tcp://127.0.0.1:%i' % port) + sub.setsockopt(zmq.SUBSCRIBE, "") + logging.basicConfig(level=level) + + while True: + level, message = sub.recv_multipart() + if message.endswith('\n'): + # trim trailing newline, which will get appended again + message = message[:-1] + log = getattr(logging, level.lower()) + log(message) + +def log_worker(port, interval=1, level=logging.DEBUG): + ctx = zmq.Context() + pub = ctx.socket(zmq.PUB) + pub.connect('tcp://127.0.0.1:%i' % port) + + logger = logging.getLogger(str(os.getpid())) + logger.setLevel(level) + handler = PUBHandler(pub) + logger.addHandler(handler) + print "starting logger at %i with level=%s" % (os.getpid(), level) + + while True: + level = random.choice(LOG_LEVELS) + logger.log(level, "Hello from %i!" % os.getpid()) + time.sleep(interval) + +if __name__ == '__main__': + if len(sys.argv) > 1: + n = int(sys.argv[1]) + else: + n = 2 + + port = 5555 + + # start the log generators + workers = [ Process(target=log_worker, args=(port,), kwargs=dict(level=random.choice(LOG_LEVELS))) for i in range(n) ] + [ w.start() for w in workers ] + + # start the log watcher + try: + sub_logger(port) + except KeyboardInterrupt: + pass + finally: + [ w.terminate() for w in workers ] -- cgit v1.2.3