summaryrefslogtreecommitdiff
path: root/examples/pubsub
diff options
context:
space:
mode:
Diffstat (limited to 'examples/pubsub')
-rw-r--r--examples/pubsub/publisher.py57
-rw-r--r--examples/pubsub/subscriber.py74
-rwxr-xr-xexamples/pubsub/topics_pub.py64
-rwxr-xr-xexamples/pubsub/topics_sub.py56
4 files changed, 0 insertions, 251 deletions
diff --git a/examples/pubsub/publisher.py b/examples/pubsub/publisher.py
deleted file mode 100644
index a2ce6c9..0000000
--- a/examples/pubsub/publisher.py
+++ /dev/null
@@ -1,57 +0,0 @@
-"""A test that publishes NumPy arrays.
-
-Uses REQ/REP (on PUB/SUB socket + 1) to synchronize
-"""
-
-#-----------------------------------------------------------------------------
-# Copyright (c) 2010 Brian Granger
-#
-# Distributed under the terms of the New BSD License. The full license is in
-# the file COPYING.BSD, distributed as part of this software.
-#-----------------------------------------------------------------------------
-
-import sys
-import time
-
-import zmq
-import numpy
-
-def sync(bind_to):
- # use bind socket + 1
- sync_with = ':'.join(bind_to.split(':')[:-1] +
- [str(int(bind_to.split(':')[-1]) + 1)])
- ctx = zmq.Context.instance()
- s = ctx.socket(zmq.REP)
- s.bind(sync_with)
- print "Waiting for subscriber to connect..."
- s.recv()
- print " Done."
- s.send('GO')
-
-def main():
- if len (sys.argv) != 4:
- print 'usage: publisher <bind-to> <array-size> <array-count>'
- sys.exit (1)
-
- try:
- bind_to = sys.argv[1]
- array_size = int(sys.argv[2])
- array_count = int (sys.argv[3])
- except (ValueError, OverflowError), e:
- print 'array-size and array-count must be integers'
- sys.exit (1)
-
- ctx = zmq.Context()
- s = ctx.socket(zmq.PUB)
- s.bind(bind_to)
-
- sync(bind_to)
-
- print "Sending arrays..."
- for i in range(array_count):
- a = numpy.random.rand(array_size, array_size)
- s.send_pyobj(a)
- print " Done."
-
-if __name__ == "__main__":
- main()
diff --git a/examples/pubsub/subscriber.py b/examples/pubsub/subscriber.py
deleted file mode 100644
index b996ad8..0000000
--- a/examples/pubsub/subscriber.py
+++ /dev/null
@@ -1,74 +0,0 @@
-"""A test that subscribes to NumPy arrays.
-
-Uses REQ/REP (on PUB/SUB socket + 1) to synchronize
-"""
-
-#-----------------------------------------------------------------------------
-# Copyright (c) 2010 Brian Granger
-#
-# Distributed under the terms of the New BSD License. The full license is in
-# the file COPYING.BSD, distributed as part of this software.
-#-----------------------------------------------------------------------------
-
-
-import sys
-import time
-
-import zmq
-import numpy
-
-def sync(connect_to):
- # use connect socket + 1
- sync_with = ':'.join(connect_to.split(':')[:-1] +
- [str(int(connect_to.split(':')[-1]) + 1)]
- )
- ctx = zmq.Context.instance()
- s = ctx.socket(zmq.REQ)
- s.connect(sync_with)
- s.send('READY')
- s.recv()
-
-def main():
- if len (sys.argv) != 3:
- print 'usage: subscriber <connect_to> <array-count>'
- sys.exit (1)
-
- try:
- connect_to = sys.argv[1]
- array_count = int (sys.argv[2])
- except (ValueError, OverflowError), e:
- print 'array-count must be integers'
- sys.exit (1)
-
- ctx = zmq.Context()
- s = ctx.socket(zmq.SUB)
- s.connect(connect_to)
- s.setsockopt(zmq.SUBSCRIBE,'')
-
- sync(connect_to)
-
- start = time.clock()
-
- print "Receiving arrays..."
- for i in range(array_count):
- a = s.recv_pyobj()
- print " Done."
-
- end = time.clock()
-
- elapsed = (end - start) * 1000000
- if elapsed == 0:
- elapsed = 1
- throughput = (1000000.0 * float (array_count)) / float (elapsed)
- message_size = a.nbytes
- megabits = float (throughput * message_size * 8) / 1000000
-
- print "message size: %.0f [B]" % (message_size, )
- print "array count: %.0f" % (array_count, )
- print "mean throughput: %.0f [msg/s]" % (throughput, )
- print "mean throughput: %.3f [Mb/s]" % (megabits, )
-
- time.sleep(1.0)
-
-if __name__ == "__main__":
- main()
diff --git a/examples/pubsub/topics_pub.py b/examples/pubsub/topics_pub.py
deleted file mode 100755
index 73b3d1c..0000000
--- a/examples/pubsub/topics_pub.py
+++ /dev/null
@@ -1,64 +0,0 @@
-#!/usr/bin/env python
-"""Simple example of publish/subscribe illustrating topics.
-
-Publisher and subscriber can be started in any order, though if publisher
-starts first, any messages sent before subscriber starts are lost. More than
-one subscriber can listen, and they can listen to different topics.
-
-Topic filtering is done simply on the start of the string, e.g. listening to
-'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to
-catch 'weather'.
-"""
-
-#-----------------------------------------------------------------------------
-# Copyright (c) 2010 Brian Granger
-#
-# Distributed under the terms of the New BSD License. The full license is in
-# the file COPYING.BSD, distributed as part of this software.
-#-----------------------------------------------------------------------------
-
-import itertools
-import sys
-import time
-
-import zmq
-
-def main():
- if len (sys.argv) != 2:
- print 'usage: publisher <bind-to>'
- sys.exit (1)
-
- bind_to = sys.argv[1]
-
- all_topics = ['sports.general','sports.football','sports.basketball',
- 'stocks.general','stocks.GOOG','stocks.AAPL',
- 'weather']
-
- ctx = zmq.Context()
- s = ctx.socket(zmq.PUB)
- s.bind(bind_to)
-
- print "Starting broadcast on topics:"
- print " %s" % all_topics
- print "Hit Ctrl-C to stop broadcasting."
- print "Waiting so subscriber sockets can connect..."
- print
- time.sleep(1.0)
-
- msg_counter = itertools.count()
- try:
- for topic in itertools.cycle(all_topics):
- msg_body = str(msg_counter.next())
- print ' Topic: %s, msg:%s' % (topic, msg_body)
- s.send_multipart([topic, msg_body])
- # short wait so we don't hog the cpu
- time.sleep(0.1)
- except KeyboardInterrupt:
- pass
-
- print "Waiting for message queues to flush..."
- time.sleep(0.5)
- print "Done."
-
-if __name__ == "__main__":
- main()
diff --git a/examples/pubsub/topics_sub.py b/examples/pubsub/topics_sub.py
deleted file mode 100755
index 4a61fb5..0000000
--- a/examples/pubsub/topics_sub.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#!/usr/bin/env python
-"""Simple example of publish/subscribe illustrating topics.
-
-Publisher and subscriber can be started in any order, though if publisher
-starts first, any messages sent before subscriber starts are lost. More than
-one subscriber can listen, and they can listen to different topics.
-
-Topic filtering is done simply on the start of the string, e.g. listening to
-'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to
-catch 'weather'.
-"""
-
-#-----------------------------------------------------------------------------
-# Copyright (c) 2010 Brian Granger, Fernando Perez
-#
-# Distributed under the terms of the New BSD License. The full license is in
-# the file COPYING.BSD, distributed as part of this software.
-#-----------------------------------------------------------------------------
-
-import sys
-import time
-
-import zmq
-import numpy
-
-def main():
- if len (sys.argv) < 2:
- print 'usage: subscriber <connect_to> [topic topic ...]'
- sys.exit (1)
-
- connect_to = sys.argv[1]
- topics = sys.argv[2:]
-
- ctx = zmq.Context()
- s = ctx.socket(zmq.SUB)
- s.connect(connect_to)
-
- # manage subscriptions
- if not topics:
- print "Receiving messages on ALL topics..."
- s.setsockopt(zmq.SUBSCRIBE,'')
- else:
- print "Receiving messages on topics: %s ..." % topics
- for t in topics:
- s.setsockopt(zmq.SUBSCRIBE,t)
- print
- try:
- while True:
- topic, msg = s.recv_multipart()
- print ' Topic: %s, msg:%s' % (topic, msg)
- except KeyboardInterrupt:
- pass
- print "Done."
-
-if __name__ == "__main__":
- main()