summaryrefslogtreecommitdiff
path: root/examples/heartbeat
diff options
context:
space:
mode:
authorMicah Anderson <micah@riseup.net>2014-11-11 11:53:55 -0500
committerMicah Anderson <micah@riseup.net>2014-11-11 11:53:55 -0500
commit7d5c3dcd969161322deed6c43f8a6a3cb92c3369 (patch)
tree109b05c88c7252d7609ef324d62ef9dd7f06123f /examples/heartbeat
parent44be832c5708baadd146cb954befbc3dcad8d463 (diff)
upgrade to 14.4.1upstream/14.4.1
Diffstat (limited to 'examples/heartbeat')
-rw-r--r--examples/heartbeat/heart.py34
-rw-r--r--examples/heartbeat/heartbeater.py90
-rw-r--r--examples/heartbeat/ping.py35
-rw-r--r--examples/heartbeat/pong.py34
4 files changed, 193 insertions, 0 deletions
diff --git a/examples/heartbeat/heart.py b/examples/heartbeat/heart.py
new file mode 100644
index 0000000..175370e
--- /dev/null
+++ b/examples/heartbeat/heart.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python
+"""This launches an echoing rep socket device,
+and runs a blocking numpy action. The rep socket should
+remain responsive to pings during this time. Use heartbeater.py to
+ping this heart, and see the responsiveness.
+
+Authors
+-------
+* MinRK
+"""
+
+import time
+import numpy
+import zmq
+from zmq import devices
+
+ctx = zmq.Context()
+
+dev = devices.ThreadDevice(zmq.FORWARDER, zmq.SUB, zmq.DEALER)
+dev.setsockopt_in(zmq.SUBSCRIBE, "")
+dev.connect_in('tcp://127.0.0.1:5555')
+dev.connect_out('tcp://127.0.0.1:5556')
+dev.start()
+
+#wait for connections
+time.sleep(1)
+
+A = numpy.random.random((2**11,2**11))
+print "starting blocking loop"
+while True:
+ tic = time.time()
+ numpy.dot(A,A.transpose())
+ print "blocked for %.3f s"%(time.time()-tic)
+
diff --git a/examples/heartbeat/heartbeater.py b/examples/heartbeat/heartbeater.py
new file mode 100644
index 0000000..180828a
--- /dev/null
+++ b/examples/heartbeat/heartbeater.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+"""
+
+For use with heart.py
+
+A basic heartbeater using PUB and ROUTER sockets. pings are sent out on the PUB, and hearts
+are tracked based on their DEALER identities.
+
+You can start many hearts with heart.py, and the heartbeater will monitor all of them, and notice when they stop responding.
+
+Authors
+-------
+* MinRK
+"""
+
+import time
+import zmq
+from zmq.eventloop import ioloop, zmqstream
+
+
+class HeartBeater(object):
+ """A basic HeartBeater class
+ pingstream: a PUB stream
+ pongstream: an ROUTER stream"""
+
+ def __init__(self, loop, pingstream, pongstream, period=1000):
+ self.loop = loop
+ self.period = period
+
+ self.pingstream = pingstream
+ self.pongstream = pongstream
+ self.pongstream.on_recv(self.handle_pong)
+
+ self.hearts = set()
+ self.responses = set()
+ self.lifetime = 0
+ self.tic = time.time()
+
+ self.caller = ioloop.PeriodicCallback(self.beat, period, self.loop)
+ self.caller.start()
+
+ def beat(self):
+ toc = time.time()
+ self.lifetime += toc-self.tic
+ self.tic = toc
+ print self.lifetime
+ # self.message = str(self.lifetime)
+ goodhearts = self.hearts.intersection(self.responses)
+ heartfailures = self.hearts.difference(goodhearts)
+ newhearts = self.responses.difference(goodhearts)
+ # print newhearts, goodhearts, heartfailures
+ map(self.handle_new_heart, newhearts)
+ map(self.handle_heart_failure, heartfailures)
+ self.responses = set()
+ print "%i beating hearts: %s"%(len(self.hearts),self.hearts)
+ self.pingstream.send(str(self.lifetime))
+
+ def handle_new_heart(self, heart):
+ print "yay, got new heart %s!"%heart
+ self.hearts.add(heart)
+
+ def handle_heart_failure(self, heart):
+ print "Heart %s failed :("%heart
+ self.hearts.remove(heart)
+
+
+ def handle_pong(self, msg):
+ "if heart is beating"
+ if msg[1] == str(self.lifetime):
+ self.responses.add(msg[0])
+ else:
+ print "got bad heartbeat (possibly old?): %s"%msg[1]
+
+# sub.setsockopt(zmq.SUBSCRIBE)
+
+
+if __name__ == '__main__':
+ loop = ioloop.IOLoop()
+ context = zmq.Context()
+ pub = context.socket(zmq.PUB)
+ pub.bind('tcp://127.0.0.1:5555')
+ router = context.socket(zmq.ROUTER)
+ router.bind('tcp://127.0.0.1:5556')
+
+ outstream = zmqstream.ZMQStream(pub, loop)
+ instream = zmqstream.ZMQStream(router, loop)
+
+ hb = HeartBeater(loop, outstream, instream)
+
+ loop.start()
diff --git a/examples/heartbeat/ping.py b/examples/heartbeat/ping.py
new file mode 100644
index 0000000..797cb8c
--- /dev/null
+++ b/examples/heartbeat/ping.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+"""For use with pong.py
+
+This script simply pings a process started by pong.py or tspong.py, to
+demonstrate that zmq remains responsive while Python blocks.
+
+Authors
+-------
+* MinRK
+"""
+from __future__ import print_function
+
+import sys
+import time
+import numpy
+import zmq
+
+ctx = zmq.Context()
+
+req = ctx.socket(zmq.REQ)
+req.connect('tcp://127.0.0.1:10111')
+
+#wait for connects
+time.sleep(1)
+n=0
+while True:
+ time.sleep(numpy.random.random())
+ for i in range(4):
+ n+=1
+ msg = 'ping %i' % n
+ tic = time.time()
+ req.send_string(msg)
+ resp = req.recv_string()
+ print("%s: %.2f ms" % (msg, 1000*(time.time()-tic)))
+ assert msg == resp
diff --git a/examples/heartbeat/pong.py b/examples/heartbeat/pong.py
new file mode 100644
index 0000000..524f394
--- /dev/null
+++ b/examples/heartbeat/pong.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python
+"""This launches an echoing rep socket device using
+zmq.devices.ThreadDevice, and runs a blocking numpy action.
+The rep socket should remain responsive to pings during this time.
+
+Use ping.py to see how responsive it is.
+
+Authors
+-------
+* MinRK
+"""
+from __future__ import print_function
+
+import time
+import numpy
+import zmq
+from zmq import devices
+
+ctx = zmq.Context()
+
+dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
+dev.bind_in('tcp://127.0.0.1:10111')
+dev.setsockopt_in(zmq.IDENTITY, b"whoda")
+dev.start()
+
+#wait for connections
+time.sleep(1)
+
+A = numpy.random.random((2**11,2**12))
+print("starting blocking loop")
+while True:
+ tic = time.time()
+ numpy.dot(A,A.transpose())
+ print("blocked for %.3f s"%(time.time()-tic))