1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
#!/usr/bin/env python
"""
For use with heart.py
A basic heartbeater using PUB and ROUTER sockets. pings are sent out on the PUB, and hearts
are tracked based on their DEALER identities.
You can start many hearts with heart.py, and the heartbeater will monitor all of them, and notice when they stop responding.
Authors
-------
* MinRK
"""
import time
import zmq
from zmq.eventloop import ioloop, zmqstream
class HeartBeater(object):
"""A basic HeartBeater class
pingstream: a PUB stream
pongstream: an ROUTER stream"""
def __init__(self, loop, pingstream, pongstream, period=1000):
self.loop = loop
self.period = period
self.pingstream = pingstream
self.pongstream = pongstream
self.pongstream.on_recv(self.handle_pong)
self.hearts = set()
self.responses = set()
self.lifetime = 0
self.tic = time.time()
self.caller = ioloop.PeriodicCallback(self.beat, period, self.loop)
self.caller.start()
def beat(self):
toc = time.time()
self.lifetime += toc-self.tic
self.tic = toc
print self.lifetime
# self.message = str(self.lifetime)
goodhearts = self.hearts.intersection(self.responses)
heartfailures = self.hearts.difference(goodhearts)
newhearts = self.responses.difference(goodhearts)
# print newhearts, goodhearts, heartfailures
map(self.handle_new_heart, newhearts)
map(self.handle_heart_failure, heartfailures)
self.responses = set()
print "%i beating hearts: %s"%(len(self.hearts),self.hearts)
self.pingstream.send(str(self.lifetime))
def handle_new_heart(self, heart):
print "yay, got new heart %s!"%heart
self.hearts.add(heart)
def handle_heart_failure(self, heart):
print "Heart %s failed :("%heart
self.hearts.remove(heart)
def handle_pong(self, msg):
"if heart is beating"
if msg[1] == str(self.lifetime):
self.responses.add(msg[0])
else:
print "got bad heartbeat (possibly old?): %s"%msg[1]
# sub.setsockopt(zmq.SUBSCRIBE)
if __name__ == '__main__':
loop = ioloop.IOLoop()
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind('tcp://127.0.0.1:5555')
router = context.socket(zmq.ROUTER)
router.bind('tcp://127.0.0.1:5556')
outstream = zmqstream.ZMQStream(pub, loop)
instream = zmqstream.ZMQStream(router, loop)
hb = HeartBeater(loop, outstream, instream)
loop.start()
|