summaryrefslogtreecommitdiff
path: root/examples/pubsub
diff options
context:
space:
mode:
authorMicah Anderson <micah@riseup.net>2014-11-11 11:53:55 -0500
committerMicah Anderson <micah@riseup.net>2014-11-11 11:53:55 -0500
commit7d5c3dcd969161322deed6c43f8a6a3cb92c3369 (patch)
tree109b05c88c7252d7609ef324d62ef9dd7f06123f /examples/pubsub
parent44be832c5708baadd146cb954befbc3dcad8d463 (diff)
upgrade to 14.4.1upstream/14.4.1
Diffstat (limited to 'examples/pubsub')
-rw-r--r--examples/pubsub/publisher.py57
-rw-r--r--examples/pubsub/subscriber.py74
-rwxr-xr-xexamples/pubsub/topics_pub.py64
-rwxr-xr-xexamples/pubsub/topics_sub.py56
4 files changed, 251 insertions, 0 deletions
diff --git a/examples/pubsub/publisher.py b/examples/pubsub/publisher.py
new file mode 100644
index 0000000..a2ce6c9
--- /dev/null
+++ b/examples/pubsub/publisher.py
@@ -0,0 +1,57 @@
+"""A test that publishes NumPy arrays.
+
+Uses REQ/REP (on PUB/SUB socket + 1) to synchronize
+"""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Brian Granger
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import sys
+import time
+
+import zmq
+import numpy
+
+def sync(bind_to):
+ # use bind socket + 1
+ sync_with = ':'.join(bind_to.split(':')[:-1] +
+ [str(int(bind_to.split(':')[-1]) + 1)])
+ ctx = zmq.Context.instance()
+ s = ctx.socket(zmq.REP)
+ s.bind(sync_with)
+ print "Waiting for subscriber to connect..."
+ s.recv()
+ print " Done."
+ s.send('GO')
+
+def main():
+ if len (sys.argv) != 4:
+ print 'usage: publisher <bind-to> <array-size> <array-count>'
+ sys.exit (1)
+
+ try:
+ bind_to = sys.argv[1]
+ array_size = int(sys.argv[2])
+ array_count = int (sys.argv[3])
+ except (ValueError, OverflowError), e:
+ print 'array-size and array-count must be integers'
+ sys.exit (1)
+
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.PUB)
+ s.bind(bind_to)
+
+ sync(bind_to)
+
+ print "Sending arrays..."
+ for i in range(array_count):
+ a = numpy.random.rand(array_size, array_size)
+ s.send_pyobj(a)
+ print " Done."
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/pubsub/subscriber.py b/examples/pubsub/subscriber.py
new file mode 100644
index 0000000..b996ad8
--- /dev/null
+++ b/examples/pubsub/subscriber.py
@@ -0,0 +1,74 @@
+"""A test that subscribes to NumPy arrays.
+
+Uses REQ/REP (on PUB/SUB socket + 1) to synchronize
+"""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Brian Granger
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+
+import sys
+import time
+
+import zmq
+import numpy
+
+def sync(connect_to):
+ # use connect socket + 1
+ sync_with = ':'.join(connect_to.split(':')[:-1] +
+ [str(int(connect_to.split(':')[-1]) + 1)]
+ )
+ ctx = zmq.Context.instance()
+ s = ctx.socket(zmq.REQ)
+ s.connect(sync_with)
+ s.send('READY')
+ s.recv()
+
+def main():
+ if len (sys.argv) != 3:
+ print 'usage: subscriber <connect_to> <array-count>'
+ sys.exit (1)
+
+ try:
+ connect_to = sys.argv[1]
+ array_count = int (sys.argv[2])
+ except (ValueError, OverflowError), e:
+ print 'array-count must be integers'
+ sys.exit (1)
+
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.SUB)
+ s.connect(connect_to)
+ s.setsockopt(zmq.SUBSCRIBE,'')
+
+ sync(connect_to)
+
+ start = time.clock()
+
+ print "Receiving arrays..."
+ for i in range(array_count):
+ a = s.recv_pyobj()
+ print " Done."
+
+ end = time.clock()
+
+ elapsed = (end - start) * 1000000
+ if elapsed == 0:
+ elapsed = 1
+ throughput = (1000000.0 * float (array_count)) / float (elapsed)
+ message_size = a.nbytes
+ megabits = float (throughput * message_size * 8) / 1000000
+
+ print "message size: %.0f [B]" % (message_size, )
+ print "array count: %.0f" % (array_count, )
+ print "mean throughput: %.0f [msg/s]" % (throughput, )
+ print "mean throughput: %.3f [Mb/s]" % (megabits, )
+
+ time.sleep(1.0)
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/pubsub/topics_pub.py b/examples/pubsub/topics_pub.py
new file mode 100755
index 0000000..73b3d1c
--- /dev/null
+++ b/examples/pubsub/topics_pub.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+"""Simple example of publish/subscribe illustrating topics.
+
+Publisher and subscriber can be started in any order, though if publisher
+starts first, any messages sent before subscriber starts are lost. More than
+one subscriber can listen, and they can listen to different topics.
+
+Topic filtering is done simply on the start of the string, e.g. listening to
+'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to
+catch 'weather'.
+"""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Brian Granger
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import itertools
+import sys
+import time
+
+import zmq
+
+def main():
+ if len (sys.argv) != 2:
+ print 'usage: publisher <bind-to>'
+ sys.exit (1)
+
+ bind_to = sys.argv[1]
+
+ all_topics = ['sports.general','sports.football','sports.basketball',
+ 'stocks.general','stocks.GOOG','stocks.AAPL',
+ 'weather']
+
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.PUB)
+ s.bind(bind_to)
+
+ print "Starting broadcast on topics:"
+ print " %s" % all_topics
+ print "Hit Ctrl-C to stop broadcasting."
+ print "Waiting so subscriber sockets can connect..."
+ print
+ time.sleep(1.0)
+
+ msg_counter = itertools.count()
+ try:
+ for topic in itertools.cycle(all_topics):
+ msg_body = str(msg_counter.next())
+ print ' Topic: %s, msg:%s' % (topic, msg_body)
+ s.send_multipart([topic, msg_body])
+ # short wait so we don't hog the cpu
+ time.sleep(0.1)
+ except KeyboardInterrupt:
+ pass
+
+ print "Waiting for message queues to flush..."
+ time.sleep(0.5)
+ print "Done."
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/pubsub/topics_sub.py b/examples/pubsub/topics_sub.py
new file mode 100755
index 0000000..4a61fb5
--- /dev/null
+++ b/examples/pubsub/topics_sub.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+"""Simple example of publish/subscribe illustrating topics.
+
+Publisher and subscriber can be started in any order, though if publisher
+starts first, any messages sent before subscriber starts are lost. More than
+one subscriber can listen, and they can listen to different topics.
+
+Topic filtering is done simply on the start of the string, e.g. listening to
+'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to
+catch 'weather'.
+"""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Brian Granger, Fernando Perez
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import sys
+import time
+
+import zmq
+import numpy
+
+def main():
+ if len (sys.argv) < 2:
+ print 'usage: subscriber <connect_to> [topic topic ...]'
+ sys.exit (1)
+
+ connect_to = sys.argv[1]
+ topics = sys.argv[2:]
+
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.SUB)
+ s.connect(connect_to)
+
+ # manage subscriptions
+ if not topics:
+ print "Receiving messages on ALL topics..."
+ s.setsockopt(zmq.SUBSCRIBE,'')
+ else:
+ print "Receiving messages on topics: %s ..." % topics
+ for t in topics:
+ s.setsockopt(zmq.SUBSCRIBE,t)
+ print
+ try:
+ while True:
+ topic, msg = s.recv_multipart()
+ print ' Topic: %s, msg:%s' % (topic, msg)
+ except KeyboardInterrupt:
+ pass
+ print "Done."
+
+if __name__ == "__main__":
+ main()