diff options
Diffstat (limited to 'examples/pubsub')
-rw-r--r-- | examples/pubsub/publisher.py | 57 | ||||
-rw-r--r-- | examples/pubsub/subscriber.py | 74 | ||||
-rwxr-xr-x | examples/pubsub/topics_pub.py | 64 | ||||
-rwxr-xr-x | examples/pubsub/topics_sub.py | 56 |
4 files changed, 0 insertions, 251 deletions
diff --git a/examples/pubsub/publisher.py b/examples/pubsub/publisher.py deleted file mode 100644 index a2ce6c9..0000000 --- a/examples/pubsub/publisher.py +++ /dev/null @@ -1,57 +0,0 @@ -"""A test that publishes NumPy arrays. - -Uses REQ/REP (on PUB/SUB socket + 1) to synchronize -""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Brian Granger -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -import sys -import time - -import zmq -import numpy - -def sync(bind_to): - # use bind socket + 1 - sync_with = ':'.join(bind_to.split(':')[:-1] + - [str(int(bind_to.split(':')[-1]) + 1)]) - ctx = zmq.Context.instance() - s = ctx.socket(zmq.REP) - s.bind(sync_with) - print "Waiting for subscriber to connect..." - s.recv() - print " Done." - s.send('GO') - -def main(): - if len (sys.argv) != 4: - print 'usage: publisher <bind-to> <array-size> <array-count>' - sys.exit (1) - - try: - bind_to = sys.argv[1] - array_size = int(sys.argv[2]) - array_count = int (sys.argv[3]) - except (ValueError, OverflowError), e: - print 'array-size and array-count must be integers' - sys.exit (1) - - ctx = zmq.Context() - s = ctx.socket(zmq.PUB) - s.bind(bind_to) - - sync(bind_to) - - print "Sending arrays..." - for i in range(array_count): - a = numpy.random.rand(array_size, array_size) - s.send_pyobj(a) - print " Done." - -if __name__ == "__main__": - main() diff --git a/examples/pubsub/subscriber.py b/examples/pubsub/subscriber.py deleted file mode 100644 index b996ad8..0000000 --- a/examples/pubsub/subscriber.py +++ /dev/null @@ -1,74 +0,0 @@ -"""A test that subscribes to NumPy arrays. - -Uses REQ/REP (on PUB/SUB socket + 1) to synchronize -""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Brian Granger -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - - -import sys -import time - -import zmq -import numpy - -def sync(connect_to): - # use connect socket + 1 - sync_with = ':'.join(connect_to.split(':')[:-1] + - [str(int(connect_to.split(':')[-1]) + 1)] - ) - ctx = zmq.Context.instance() - s = ctx.socket(zmq.REQ) - s.connect(sync_with) - s.send('READY') - s.recv() - -def main(): - if len (sys.argv) != 3: - print 'usage: subscriber <connect_to> <array-count>' - sys.exit (1) - - try: - connect_to = sys.argv[1] - array_count = int (sys.argv[2]) - except (ValueError, OverflowError), e: - print 'array-count must be integers' - sys.exit (1) - - ctx = zmq.Context() - s = ctx.socket(zmq.SUB) - s.connect(connect_to) - s.setsockopt(zmq.SUBSCRIBE,'') - - sync(connect_to) - - start = time.clock() - - print "Receiving arrays..." - for i in range(array_count): - a = s.recv_pyobj() - print " Done." - - end = time.clock() - - elapsed = (end - start) * 1000000 - if elapsed == 0: - elapsed = 1 - throughput = (1000000.0 * float (array_count)) / float (elapsed) - message_size = a.nbytes - megabits = float (throughput * message_size * 8) / 1000000 - - print "message size: %.0f [B]" % (message_size, ) - print "array count: %.0f" % (array_count, ) - print "mean throughput: %.0f [msg/s]" % (throughput, ) - print "mean throughput: %.3f [Mb/s]" % (megabits, ) - - time.sleep(1.0) - -if __name__ == "__main__": - main() diff --git a/examples/pubsub/topics_pub.py b/examples/pubsub/topics_pub.py deleted file mode 100755 index 73b3d1c..0000000 --- a/examples/pubsub/topics_pub.py +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python -"""Simple example of publish/subscribe illustrating topics. - -Publisher and subscriber can be started in any order, though if publisher -starts first, any messages sent before subscriber starts are lost. More than -one subscriber can listen, and they can listen to different topics. - -Topic filtering is done simply on the start of the string, e.g. listening to -'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to -catch 'weather'. -""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Brian Granger -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -import itertools -import sys -import time - -import zmq - -def main(): - if len (sys.argv) != 2: - print 'usage: publisher <bind-to>' - sys.exit (1) - - bind_to = sys.argv[1] - - all_topics = ['sports.general','sports.football','sports.basketball', - 'stocks.general','stocks.GOOG','stocks.AAPL', - 'weather'] - - ctx = zmq.Context() - s = ctx.socket(zmq.PUB) - s.bind(bind_to) - - print "Starting broadcast on topics:" - print " %s" % all_topics - print "Hit Ctrl-C to stop broadcasting." - print "Waiting so subscriber sockets can connect..." - print - time.sleep(1.0) - - msg_counter = itertools.count() - try: - for topic in itertools.cycle(all_topics): - msg_body = str(msg_counter.next()) - print ' Topic: %s, msg:%s' % (topic, msg_body) - s.send_multipart([topic, msg_body]) - # short wait so we don't hog the cpu - time.sleep(0.1) - except KeyboardInterrupt: - pass - - print "Waiting for message queues to flush..." - time.sleep(0.5) - print "Done." - -if __name__ == "__main__": - main() diff --git a/examples/pubsub/topics_sub.py b/examples/pubsub/topics_sub.py deleted file mode 100755 index 4a61fb5..0000000 --- a/examples/pubsub/topics_sub.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python -"""Simple example of publish/subscribe illustrating topics. - -Publisher and subscriber can be started in any order, though if publisher -starts first, any messages sent before subscriber starts are lost. More than -one subscriber can listen, and they can listen to different topics. - -Topic filtering is done simply on the start of the string, e.g. listening to -'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to -catch 'weather'. -""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Brian Granger, Fernando Perez -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -import sys -import time - -import zmq -import numpy - -def main(): - if len (sys.argv) < 2: - print 'usage: subscriber <connect_to> [topic topic ...]' - sys.exit (1) - - connect_to = sys.argv[1] - topics = sys.argv[2:] - - ctx = zmq.Context() - s = ctx.socket(zmq.SUB) - s.connect(connect_to) - - # manage subscriptions - if not topics: - print "Receiving messages on ALL topics..." - s.setsockopt(zmq.SUBSCRIBE,'') - else: - print "Receiving messages on topics: %s ..." % topics - for t in topics: - s.setsockopt(zmq.SUBSCRIBE,t) - print - try: - while True: - topic, msg = s.recv_multipart() - print ' Topic: %s, msg:%s' % (topic, msg) - except KeyboardInterrupt: - pass - print "Done." - -if __name__ == "__main__": - main() |