From cce638a8adf4e045ca5505afea4bda57753c31dd Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Mon, 11 Aug 2014 16:33:29 -0400 Subject: initial import of debian package --- examples/heartbeat/heart.py | 34 +++++++++++++++ examples/heartbeat/heartbeater.py | 90 +++++++++++++++++++++++++++++++++++++++ examples/heartbeat/ping.py | 34 +++++++++++++++ examples/heartbeat/pong.py | 34 +++++++++++++++ 4 files changed, 192 insertions(+) create mode 100644 examples/heartbeat/heart.py create mode 100644 examples/heartbeat/heartbeater.py create mode 100644 examples/heartbeat/ping.py create mode 100644 examples/heartbeat/pong.py (limited to 'examples/heartbeat') diff --git a/examples/heartbeat/heart.py b/examples/heartbeat/heart.py new file mode 100644 index 0000000..175370e --- /dev/null +++ b/examples/heartbeat/heart.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +"""This launches an echoing rep socket device, +and runs a blocking numpy action. The rep socket should +remain responsive to pings during this time. Use heartbeater.py to +ping this heart, and see the responsiveness. + +Authors +------- +* MinRK +""" + +import time +import numpy +import zmq +from zmq import devices + +ctx = zmq.Context() + +dev = devices.ThreadDevice(zmq.FORWARDER, zmq.SUB, zmq.DEALER) +dev.setsockopt_in(zmq.SUBSCRIBE, "") +dev.connect_in('tcp://127.0.0.1:5555') +dev.connect_out('tcp://127.0.0.1:5556') +dev.start() + +#wait for connections +time.sleep(1) + +A = numpy.random.random((2**11,2**11)) +print "starting blocking loop" +while True: + tic = time.time() + numpy.dot(A,A.transpose()) + print "blocked for %.3f s"%(time.time()-tic) + diff --git a/examples/heartbeat/heartbeater.py b/examples/heartbeat/heartbeater.py new file mode 100644 index 0000000..180828a --- /dev/null +++ b/examples/heartbeat/heartbeater.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +""" + +For use with heart.py + +A basic heartbeater using PUB and ROUTER sockets. pings are sent out on the PUB, and hearts +are tracked based on their DEALER identities. + +You can start many hearts with heart.py, and the heartbeater will monitor all of them, and notice when they stop responding. + +Authors +------- +* MinRK +""" + +import time +import zmq +from zmq.eventloop import ioloop, zmqstream + + +class HeartBeater(object): + """A basic HeartBeater class + pingstream: a PUB stream + pongstream: an ROUTER stream""" + + def __init__(self, loop, pingstream, pongstream, period=1000): + self.loop = loop + self.period = period + + self.pingstream = pingstream + self.pongstream = pongstream + self.pongstream.on_recv(self.handle_pong) + + self.hearts = set() + self.responses = set() + self.lifetime = 0 + self.tic = time.time() + + self.caller = ioloop.PeriodicCallback(self.beat, period, self.loop) + self.caller.start() + + def beat(self): + toc = time.time() + self.lifetime += toc-self.tic + self.tic = toc + print self.lifetime + # self.message = str(self.lifetime) + goodhearts = self.hearts.intersection(self.responses) + heartfailures = self.hearts.difference(goodhearts) + newhearts = self.responses.difference(goodhearts) + # print newhearts, goodhearts, heartfailures + map(self.handle_new_heart, newhearts) + map(self.handle_heart_failure, heartfailures) + self.responses = set() + print "%i beating hearts: %s"%(len(self.hearts),self.hearts) + self.pingstream.send(str(self.lifetime)) + + def handle_new_heart(self, heart): + print "yay, got new heart %s!"%heart + self.hearts.add(heart) + + def handle_heart_failure(self, heart): + print "Heart %s failed :("%heart + self.hearts.remove(heart) + + + def handle_pong(self, msg): + "if heart is beating" + if msg[1] == str(self.lifetime): + self.responses.add(msg[0]) + else: + print "got bad heartbeat (possibly old?): %s"%msg[1] + +# sub.setsockopt(zmq.SUBSCRIBE) + + +if __name__ == '__main__': + loop = ioloop.IOLoop() + context = zmq.Context() + pub = context.socket(zmq.PUB) + pub.bind('tcp://127.0.0.1:5555') + router = context.socket(zmq.ROUTER) + router.bind('tcp://127.0.0.1:5556') + + outstream = zmqstream.ZMQStream(pub, loop) + instream = zmqstream.ZMQStream(router, loop) + + hb = HeartBeater(loop, outstream, instream) + + loop.start() diff --git a/examples/heartbeat/ping.py b/examples/heartbeat/ping.py new file mode 100644 index 0000000..933a39a --- /dev/null +++ b/examples/heartbeat/ping.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +"""For use with pong.py + +This script simply pings a process started by pong.py or tspong.py, to +demonstrate that zmq remains responsive while Python blocks. + +Authors +------- +* MinRK +""" + +import time +import numpy +import zmq + +ctx = zmq.Context() + +req = ctx.socket(zmq.REQ) +req.connect('tcp://127.0.0.1:10111') + +#wait for connects +time.sleep(1) +n=0 +while True: + time.sleep(numpy.random.random()) + for i in range(4): + n+=1 + msg = 'ping %i'%n + tic = time.time() + req.send(msg) + resp = req.recv() + print "%s: %.2f ms" % (msg, 1000*(time.time()-tic)) + assert msg == resp + diff --git a/examples/heartbeat/pong.py b/examples/heartbeat/pong.py new file mode 100644 index 0000000..47efb3a --- /dev/null +++ b/examples/heartbeat/pong.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +"""This launches an echoing rep socket device using +zmq.devices.ThreadDevice, and runs a blocking numpy action. +The rep socket should remain responsive to pings during this time. + +Use ping.py to see how responsive it is. + +Authors +------- +* MinRK +""" + +import time +import numpy +import zmq +from zmq import devices + +ctx = zmq.Context() + +dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1) +dev.bind_in('tcp://127.0.0.1:10111') +dev.setsockopt_in(zmq.IDENTITY, "whoda") +dev.start() + +#wait for connections +time.sleep(1) + +A = numpy.random.random((2**11,2**12)) +print "starting blocking loop" +while True: + tic = time.time() + numpy.dot(A,A.transpose()) + print "blocked for %.3f s"%(time.time()-tic) + -- cgit v1.2.3