summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorMicah Anderson <micah@riseup.net>2014-11-11 11:53:55 -0500
committerMicah Anderson <micah@riseup.net>2014-11-11 11:53:55 -0500
commit7d5c3dcd969161322deed6c43f8a6a3cb92c3369 (patch)
tree109b05c88c7252d7609ef324d62ef9dd7f06123f /examples
parent44be832c5708baadd146cb954befbc3dcad8d463 (diff)
upgrade to 14.4.1upstream/14.4.1
Diffstat (limited to 'examples')
-rw-r--r--examples/LICENSE3
-rw-r--r--examples/README_PY3K10
-rw-r--r--examples/bench/benchmark.py25
-rw-r--r--examples/bench/jsonrpc_client.py4
-rw-r--r--examples/bench/jsonrpc_server.py8
-rw-r--r--examples/bench/latency.pngbin0 -> 58452 bytes
-rw-r--r--examples/bench/msgs_sec.pngbin0 -> 56500 bytes
-rw-r--r--examples/bench/msgs_sec_log.pngbin0 -> 59966 bytes
-rw-r--r--examples/bench/msgs_sec_ratio.pngbin0 -> 39876 bytes
-rw-r--r--examples/bench/plot_latency.py84
-rw-r--r--examples/bench/pyro_client.py3
-rw-r--r--examples/bench/pyro_server.py14
-rw-r--r--examples/bench/pyzmq_client.py16
-rw-r--r--examples/bench/pyzmq_server.py10
-rw-r--r--examples/bench/xmlrpc_client.py6
-rw-r--r--examples/bench/xmlrpc_server.py8
-rw-r--r--examples/chat/display.py41
-rw-r--r--examples/chat/prompt.py39
-rw-r--r--examples/device/client.py38
-rw-r--r--examples/device/server.py52
-rw-r--r--examples/eventloop/asyncweb.py96
-rw-r--r--examples/eventloop/echo.py27
-rw-r--r--examples/eventloop/echostream.py24
-rw-r--r--examples/eventloop/web.py46
-rw-r--r--examples/gevent/poll.py42
-rw-r--r--examples/gevent/reqrep.py47
-rw-r--r--examples/gevent/simple.py37
-rw-r--r--examples/heartbeat/heart.py34
-rw-r--r--examples/heartbeat/heartbeater.py90
-rw-r--r--examples/heartbeat/ping.py35
-rw-r--r--examples/heartbeat/pong.py34
-rw-r--r--examples/logger/zmqlogger.py70
-rw-r--r--examples/mongodb/client.py46
-rw-r--r--examples/mongodb/controller.py91
-rw-r--r--examples/monitoring/simple_monitor.py112
-rw-r--r--examples/poll/pair.py56
-rw-r--r--examples/poll/pubsub.py57
-rw-r--r--examples/poll/reqrep.py71
-rw-r--r--examples/pubsub/publisher.py57
-rw-r--r--examples/pubsub/subscriber.py74
-rwxr-xr-xexamples/pubsub/topics_pub.py64
-rwxr-xr-xexamples/pubsub/topics_sub.py56
-rw-r--r--examples/security/generate_certificates.py49
-rw-r--r--examples/security/grasslands.py29
-rw-r--r--examples/security/ioloop-ironhouse.py116
-rw-r--r--examples/security/ironhouse.py95
-rw-r--r--examples/security/stonehouse.py95
-rw-r--r--examples/security/strawhouse.py94
-rw-r--r--examples/security/woodhouse.py90
-rw-r--r--examples/serialization/serialsocket.py74
-rw-r--r--examples/win32-interrupt/display.py45
-rw-r--r--examples/win32-interrupt/prompt.py39
52 files changed, 2353 insertions, 0 deletions
diff --git a/examples/LICENSE b/examples/LICENSE
new file mode 100644
index 0000000..d4d3950
--- /dev/null
+++ b/examples/LICENSE
@@ -0,0 +1,3 @@
+PyZMQ examples are copyright their respective authors, and licensed
+under the New BSD License as described in COPYING.BSD unless otherwise
+specified in the file. \ No newline at end of file
diff --git a/examples/README_PY3K b/examples/README_PY3K
new file mode 100644
index 0000000..d5272d0
--- /dev/null
+++ b/examples/README_PY3K
@@ -0,0 +1,10 @@
+These examples use Python2 syntax. Due to the change in Python from bytestring str objects
+to unicode str objects, 2to3 does not perform an adequate transform of the code. Examples
+can be valid on both Python2.5 and Python3, but such code is less readable than it should be.
+
+As a result, the Python3 examples are kept in a separate repo:
+
+https://github.com/minrk/pyzmq-py3k-examples
+
+
+The differences are very small, but important. \ No newline at end of file
diff --git a/examples/bench/benchmark.py b/examples/bench/benchmark.py
new file mode 100644
index 0000000..c379af9
--- /dev/null
+++ b/examples/bench/benchmark.py
@@ -0,0 +1,25 @@
+from timeit import default_timer as timer
+
+def benchmark(f, size, reps):
+ msg = size*'0'
+ t1 = timer()
+ for i in range(reps):
+ msg2 = f(msg)
+ assert msg == msg2
+ t2 = timer()
+ diff = (t2-t1)
+ latency = diff/reps
+ return latency*1000000
+
+kB = [1000*2**n for n in range(10)]
+MB = [1000000*2**n for n in range(8)]
+sizes = [1] + kB + MB
+
+def benchmark_set(f, sizes, reps):
+ latencies = []
+ for size, rep in zip(sizes, reps):
+ print "Running benchmark with %r reps of %r bytes" % (rep, size)
+ lat = benchmark(f, size, rep)
+ latencies.append(lat)
+ return sizes, latencies
+
diff --git a/examples/bench/jsonrpc_client.py b/examples/bench/jsonrpc_client.py
new file mode 100644
index 0000000..7fb6ef4
--- /dev/null
+++ b/examples/bench/jsonrpc_client.py
@@ -0,0 +1,4 @@
+from timeit import default_timer as timer
+from jsonrpclib import Server
+
+client = Server('http://localhost:10000')
diff --git a/examples/bench/jsonrpc_server.py b/examples/bench/jsonrpc_server.py
new file mode 100644
index 0000000..4500a02
--- /dev/null
+++ b/examples/bench/jsonrpc_server.py
@@ -0,0 +1,8 @@
+from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
+
+def echo(x):
+ return x
+
+server = SimpleJSONRPCServer(('localhost',10000))
+server.register_function(echo)
+server.serve_forever() \ No newline at end of file
diff --git a/examples/bench/latency.png b/examples/bench/latency.png
new file mode 100644
index 0000000..bb414b5
--- /dev/null
+++ b/examples/bench/latency.png
Binary files differ
diff --git a/examples/bench/msgs_sec.png b/examples/bench/msgs_sec.png
new file mode 100644
index 0000000..a7b294b
--- /dev/null
+++ b/examples/bench/msgs_sec.png
Binary files differ
diff --git a/examples/bench/msgs_sec_log.png b/examples/bench/msgs_sec_log.png
new file mode 100644
index 0000000..c3a361e
--- /dev/null
+++ b/examples/bench/msgs_sec_log.png
Binary files differ
diff --git a/examples/bench/msgs_sec_ratio.png b/examples/bench/msgs_sec_ratio.png
new file mode 100644
index 0000000..0a87331
--- /dev/null
+++ b/examples/bench/msgs_sec_ratio.png
Binary files differ
diff --git a/examples/bench/plot_latency.py b/examples/bench/plot_latency.py
new file mode 100644
index 0000000..f50ef29
--- /dev/null
+++ b/examples/bench/plot_latency.py
@@ -0,0 +1,84 @@
+"""Plot latency data from messaging benchmarks.
+
+To generate the data for each library, I started the server and then did
+the following for each client::
+
+ from xmlrpc_client import client
+ for i in range(9):
+ s = '0'*10**i
+ print s
+ %timeit client.echo(s)
+"""
+
+from matplotlib.pylab import *
+
+rawdata = """# Data in milliseconds
+Bytes JSONRPC PYRO XMLRPC pyzmq_copy pyzmq_nocopy
+1 2.15 0.186 2.07 0.111 0.136
+10 2.49 0.187 1.87 0.115 0.137
+100 2.5 0.189 1.9 0.126 0.138
+1000 2.54 0.196 1.91 0.129 0.141
+10000 2.91 0.271 2.77 0.204 0.197
+100000 6.65 1.44 9.17 0.961 0.546
+1000000 50.2 15.8 81.5 8.39 2.25
+10000000 491 159 816 91.7 25.2
+100000000 5010 1560 8300 893 248
+
+"""
+with open('latency.csv','w') as f:
+ f.writelines(rawdata)
+
+data = csv2rec('latency.csv',delimiter='\t')
+
+loglog(data.bytes, data.xmlrpc*1000, label='XMLRPC')
+loglog(data.bytes, data.jsonrpc*1000, label='JSONRPC')
+loglog(data.bytes, data.pyro*1000, label='Pyro')
+loglog(data.bytes, data.pyzmq_nocopy*1000, label='PyZMQ')
+loglog(data.bytes, len(data.bytes)*[60], label='Ping')
+legend(loc=2)
+title('Latency')
+xlabel('Number of bytes')
+ylabel('Round trip latency ($\mu s$)')
+grid(True)
+show()
+savefig('latency.png')
+
+clf()
+
+semilogx(data.bytes, 1000/data.xmlrpc, label='XMLRPC')
+semilogx(data.bytes, 1000/data.jsonrpc, label='JSONRPC')
+semilogx(data.bytes, 1000/data.pyro, label='Pyro')
+semilogx(data.bytes, 1000/data.pyzmq_nocopy, label='PyZMQ')
+legend(loc=1)
+xlabel('Number of bytes')
+ylabel('Message/s')
+title('Message Throughput')
+grid(True)
+show()
+savefig('msgs_sec.png')
+
+clf()
+
+loglog(data.bytes, 1000/data.xmlrpc, label='XMLRPC')
+loglog(data.bytes, 1000/data.jsonrpc, label='JSONRPC')
+loglog(data.bytes, 1000/data.pyro, label='Pyro')
+loglog(data.bytes, 1000/data.pyzmq_nocopy, label='PyZMQ')
+legend(loc=3)
+xlabel('Number of bytes')
+ylabel('Message/s')
+title('Message Throughput')
+grid(True)
+show()
+savefig('msgs_sec_log.png')
+
+clf()
+
+semilogx(data.bytes, data.pyro/data.pyzmq_nocopy, label="No-copy")
+semilogx(data.bytes, data.pyro/data.pyzmq_copy, label="Copy")
+xlabel('Number of bytes')
+ylabel('Ratio throughputs')
+title('PyZMQ Throughput/Pyro Throughput')
+grid(True)
+legend(loc=2)
+show()
+savefig('msgs_sec_ratio.png')
diff --git a/examples/bench/pyro_client.py b/examples/bench/pyro_client.py
new file mode 100644
index 0000000..5e25feb
--- /dev/null
+++ b/examples/bench/pyro_client.py
@@ -0,0 +1,3 @@
+import Pyro.core
+
+client = Pyro.core.getProxyForURI("PYROLOC://localhost:7766/echo") \ No newline at end of file
diff --git a/examples/bench/pyro_server.py b/examples/bench/pyro_server.py
new file mode 100644
index 0000000..a2a2446
--- /dev/null
+++ b/examples/bench/pyro_server.py
@@ -0,0 +1,14 @@
+import Pyro.core
+
+class Echo(Pyro.core.ObjBase):
+ def __init__(self):
+ Pyro.core.ObjBase.__init__(self)
+ def echo(self, x):
+ return x
+
+Pyro.core.initServer()
+daemon=Pyro.core.Daemon()
+uri=daemon.connect(Echo(),"echo")
+
+daemon.requestLoop()
+ \ No newline at end of file
diff --git a/examples/bench/pyzmq_client.py b/examples/bench/pyzmq_client.py
new file mode 100644
index 0000000..9afccec
--- /dev/null
+++ b/examples/bench/pyzmq_client.py
@@ -0,0 +1,16 @@
+import zmq
+
+c = zmq.Context()
+s = c.socket(zmq.REQ)
+s.connect('tcp://127.0.0.1:10001')
+
+def echo(msg):
+ s.send(msg, copy=False)
+ msg2 = s.recv(copy=False)
+ return msg2
+
+class Client(object):
+ pass
+
+client = Client()
+client.echo = echo
diff --git a/examples/bench/pyzmq_server.py b/examples/bench/pyzmq_server.py
new file mode 100644
index 0000000..cab0082
--- /dev/null
+++ b/examples/bench/pyzmq_server.py
@@ -0,0 +1,10 @@
+import zmq
+
+c = zmq.Context()
+s = c.socket(zmq.REP)
+s.bind('tcp://127.0.0.1:10001')
+
+while True:
+ msg = s.recv(copy=False)
+ s.send(msg)
+
diff --git a/examples/bench/xmlrpc_client.py b/examples/bench/xmlrpc_client.py
new file mode 100644
index 0000000..a73ddfd
--- /dev/null
+++ b/examples/bench/xmlrpc_client.py
@@ -0,0 +1,6 @@
+from timeit import default_timer as timer
+from xmlrpclib import ServerProxy
+
+client = ServerProxy('http://localhost:10002')
+
+ \ No newline at end of file
diff --git a/examples/bench/xmlrpc_server.py b/examples/bench/xmlrpc_server.py
new file mode 100644
index 0000000..24ab019
--- /dev/null
+++ b/examples/bench/xmlrpc_server.py
@@ -0,0 +1,8 @@
+from SimpleXMLRPCServer import SimpleXMLRPCServer
+
+def echo(x):
+ return x
+
+server = SimpleXMLRPCServer(('localhost',10002))
+server.register_function(echo)
+server.serve_forever() \ No newline at end of file
diff --git a/examples/chat/display.py b/examples/chat/display.py
new file mode 100644
index 0000000..d4e240a
--- /dev/null
+++ b/examples/chat/display.py
@@ -0,0 +1,41 @@
+"""The display part of a simply two process chat app."""
+
+#
+# Copyright (c) 2010 Andrew Gwozdziewycz
+#
+# This file is part of pyzmq.
+#
+# pyzmq is free software; you can redistribute it and/or modify it under
+# the terms of the Lesser GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# pyzmq is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Lesser GNU General Public License for more details.
+#
+# You should have received a copy of the Lesser GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import zmq
+
+def main(addrs):
+
+ context = zmq.Context()
+ socket = context.socket(zmq.SUB)
+ socket.setsockopt(zmq.SUBSCRIBE, "")
+ for addr in addrs:
+ print "Connecting to: ", addr
+ socket.connect(addr)
+
+ while True:
+ msg = socket.recv_pyobj()
+ print "%s: %s" % (msg[1], msg[0])
+
+if __name__ == '__main__':
+ import sys
+ if len(sys.argv) < 2:
+ print "usage: display.py <address> [,<address>...]"
+ raise SystemExit
+ main(sys.argv[1:])
diff --git a/examples/chat/prompt.py b/examples/chat/prompt.py
new file mode 100644
index 0000000..d9b12ec
--- /dev/null
+++ b/examples/chat/prompt.py
@@ -0,0 +1,39 @@
+"""The prompt part of a simply two process chat app."""
+
+#
+# Copyright (c) 2010 Andrew Gwozdziewycz
+#
+# This file is part of pyzmq.
+#
+# pyzmq is free software; you can redistribute it and/or modify it under
+# the terms of the Lesser GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# pyzmq is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Lesser GNU General Public License for more details.
+#
+# You should have received a copy of the Lesser GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import zmq
+
+def main(addr, who):
+
+ ctx = zmq.Context()
+ socket = ctx.socket(zmq.PUB)
+ socket.bind(addr)
+
+ while True:
+ msg = raw_input("%s> " % who)
+ socket.send_pyobj((msg, who))
+
+
+if __name__ == '__main__':
+ import sys
+ if len(sys.argv) != 3:
+ print "usage: prompt.py <address> <username>"
+ raise SystemExit
+ main(sys.argv[1], sys.argv[2])
diff --git a/examples/device/client.py b/examples/device/client.py
new file mode 100644
index 0000000..14a4e26
--- /dev/null
+++ b/examples/device/client.py
@@ -0,0 +1,38 @@
+"""A client for the device based server."""
+
+#
+# Copyright (c) 2010 Brian E. Granger and Eugene Chernyshov
+#
+# This file is part of pyzmq.
+#
+# pyzmq is free software; you can redistribute it and/or modify it under
+# the terms of the Lesser GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# pyzmq is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Lesser GNU General Public License for more details.
+#
+# You should have received a copy of the Lesser GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import zmq
+import os
+from time import time
+
+print 'Client', os.getpid()
+
+context = zmq.Context(1)
+
+socket = context.socket(zmq.REQ)
+socket.connect('tcp://127.0.0.1:5555')
+
+while True:
+ data = zmq.Message(str(os.getpid()))
+ start = time()
+ socket.send(data)
+ data = socket.recv()
+ print time()-start, data
+
diff --git a/examples/device/server.py b/examples/device/server.py
new file mode 100644
index 0000000..b29a3c1
--- /dev/null
+++ b/examples/device/server.py
@@ -0,0 +1,52 @@
+"""A device based server."""
+
+#
+# Copyright (c) 2010 Brian E. Granger and Eugene Chernyshov
+#
+# This file is part of pyzmq.
+#
+# pyzmq is free software; you can redistribute it and/or modify it under
+# the terms of the Lesser GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# pyzmq is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Lesser GNU General Public License for more details.
+#
+# You should have received a copy of the Lesser GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import zmq
+import os
+import threading
+import time
+
+print 'Server', os.getpid()
+
+def routine(context):
+ socket = context.socket(zmq.REP)
+
+ socket.connect("inproc://workers")
+
+ while True:
+ message = socket.recv()
+ time.sleep(1)
+ socket.send(message)
+
+context = zmq.Context(1)
+
+workers = context.socket(zmq.DEALER)
+workers.bind("inproc://workers");
+
+clients = context.socket(zmq.DEALER)
+clients.bind('tcp://127.0.0.1:5555')
+
+for i in range(10):
+ thread = threading.Thread(target=routine, args=(context, ))
+ thread.start()
+
+zmq.device(zmq.QUEUE, clients, workers)
+
+print "Finished"
diff --git a/examples/eventloop/asyncweb.py b/examples/eventloop/asyncweb.py
new file mode 100644
index 0000000..06b03f3
--- /dev/null
+++ b/examples/eventloop/asyncweb.py
@@ -0,0 +1,96 @@
+"""Async web request example with tornado.
+
+Requests to localhost:8888 will be relayed via 0MQ to a slow responder,
+who will take 1-5 seconds to respond. The tornado app will remain responsive
+duriung this time, and when the worker replies, the web request will finish.
+
+A '.' is printed every 100ms to demonstrate that the zmq request is not blocking
+the event loop.
+"""
+
+
+import sys
+import random
+import threading
+import time
+
+import zmq
+from zmq.eventloop import ioloop, zmqstream
+
+"""
+ioloop.install() must be called prior to instantiating *any* tornado objects,
+and ideally before importing anything from tornado, just to be safe.
+
+install() sets the singleton instance of tornado.ioloop.IOLoop with zmq's
+IOLoop. If this is not done properly, multiple IOLoop instances may be
+created, which will have the effect of some subset of handlers never being
+called, because only one loop will be running.
+"""
+
+ioloop.install()
+
+import tornado
+from tornado import web
+
+
+def slow_responder():
+ """thread for slowly responding to replies."""
+ ctx = zmq.Context.instance()
+ socket = ctx.socket(zmq.REP)
+ socket.linger = 0
+ socket.bind('tcp://127.0.0.1:5555')
+ i=0
+ while True:
+ msg = socket.recv()
+ print "\nworker received %r\n" % msg
+ time.sleep(random.randint(1,5))
+ socket.send(msg + " to you too, #%i" % i)
+ i+=1
+
+def dot():
+ """callback for showing that IOLoop is still responsive while we wait"""
+ sys.stdout.write('.')
+ sys.stdout.flush()
+
+def printer(msg):
+ print (msg)
+
+class TestHandler(web.RequestHandler):
+
+ @web.asynchronous
+ def get(self):
+ ctx = zmq.Context.instance()
+ s = ctx.socket(zmq.REQ)
+ s.connect('tcp://127.0.0.1:5555')
+ # send request to worker
+ s.send('hello')
+ loop = ioloop.IOLoop.instance()
+ self.stream = zmqstream.ZMQStream(s)
+ self.stream.on_recv(self.handle_reply)
+
+ def handle_reply(self, msg):
+ # finish web request with worker's reply
+ reply = msg[0]
+ print "\nfinishing with %r\n" % reply,
+ self.stream.close()
+ self.write(reply)
+ self.finish()
+
+def main():
+ worker = threading.Thread(target=slow_responder)
+ worker.daemon=True
+ worker.start()
+
+ application = web.Application([(r"/", TestHandler)])
+ beat = ioloop.PeriodicCallback(dot, 100)
+ beat.start()
+ application.listen(8888)
+ try:
+ ioloop.IOLoop.instance().start()
+ except KeyboardInterrupt:
+ print ' Interrupted'
+
+
+if __name__ == "__main__":
+ main()
+
diff --git a/examples/eventloop/echo.py b/examples/eventloop/echo.py
new file mode 100644
index 0000000..9be079b
--- /dev/null
+++ b/examples/eventloop/echo.py
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+"""A trivial ZMQ echo server using the eventloop.
+
+Authors
+-------
+* MinRK
+"""
+
+import zmq
+from zmq.eventloop import ioloop
+
+loop = ioloop.IOLoop.instance()
+
+ctx = zmq.Context()
+s = ctx.socket(zmq.REP)
+s.bind('tcp://127.0.0.1:5555')
+
+def rep_handler(sock, events):
+ # We don't know how many recv's we can do?
+ msg = sock.recv()
+ # No guarantee that we can do the send. We need a way of putting the
+ # send in the event loop.
+ sock.send(msg)
+
+loop.add_handler(s, rep_handler, zmq.POLLIN)
+
+loop.start() \ No newline at end of file
diff --git a/examples/eventloop/echostream.py b/examples/eventloop/echostream.py
new file mode 100644
index 0000000..04c1532
--- /dev/null
+++ b/examples/eventloop/echostream.py
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+"""Adapted echo.py to put the send in the event loop using a ZMQStream.
+
+Authors
+-------
+* MinRK
+"""
+
+import zmq
+from zmq.eventloop import ioloop, zmqstream
+loop = ioloop.IOLoop.instance()
+
+ctx = zmq.Context()
+s = ctx.socket(zmq.REP)
+s.bind('tcp://127.0.0.1:5555')
+stream = zmqstream.ZMQStream(s, loop)
+
+def echo(msg):
+ print " ".join(msg)
+ stream.send_multipart(msg)
+
+stream.on_recv(echo)
+
+loop.start() \ No newline at end of file
diff --git a/examples/eventloop/web.py b/examples/eventloop/web.py
new file mode 100644
index 0000000..1285f95
--- /dev/null
+++ b/examples/eventloop/web.py
@@ -0,0 +1,46 @@
+import zmq
+from zmq.eventloop import ioloop, zmqstream
+
+"""
+ioloop.install() must be called prior to instantiating *any* tornado objects,
+and ideally before importing anything from tornado, just to be safe.
+
+install() sets the singleton instance of tornado.ioloop.IOLoop with zmq's
+IOLoop. If this is not done properly, multiple IOLoop instances may be
+created, which will have the effect of some subset of handlers never being
+called, because only one loop will be running.
+"""
+
+ioloop.install()
+
+import tornado
+import tornado.web
+
+
+"""
+this application can be used with echostream.py, start echostream.py,
+start web.py, then every time you hit http://localhost:8888/,
+echostream.py will print out 'hello'
+"""
+
+def printer(msg):
+ print (msg)
+
+ctx = zmq.Context()
+s = ctx.socket(zmq.REQ)
+s.connect('tcp://127.0.0.1:5555')
+stream = zmqstream.ZMQStream(s)
+stream.on_recv(printer)
+
+class TestHandler(tornado.web.RequestHandler):
+ def get(self):
+ print ("sending hello")
+ stream.send("hello")
+ self.write("hello")
+application = tornado.web.Application([(r"/", TestHandler)])
+
+if __name__ == "__main__":
+ application.listen(8888)
+ ioloop.IOLoop.instance().start()
+
+
diff --git a/examples/gevent/poll.py b/examples/gevent/poll.py
new file mode 100644
index 0000000..1daf80a
--- /dev/null
+++ b/examples/gevent/poll.py
@@ -0,0 +1,42 @@
+import gevent
+from zmq import green as zmq
+
+# Connect to both receiving sockets and send 10 messages
+def sender():
+
+ sender = context.socket(zmq.PUSH)
+ sender.connect('inproc://polltest1')
+ sender.connect('inproc://polltest2')
+
+ for i in xrange(10):
+ sender.send('test %d' % i)
+ gevent.sleep(1)
+
+
+# create zmq context, and bind to pull sockets
+context = zmq.Context()
+receiver1 = context.socket(zmq.PULL)
+receiver1.bind('inproc://polltest1')
+receiver2 = context.socket(zmq.PULL)
+receiver2.bind('inproc://polltest2')
+
+gevent.spawn(sender)
+
+# Create poller and register both reciever sockets
+poller = zmq.Poller()
+poller.register(receiver1, zmq.POLLIN)
+poller.register(receiver2, zmq.POLLIN)
+
+# Read 10 messages from both reciever sockets
+msgcnt = 0
+while msgcnt < 10:
+ socks = dict(poller.poll())
+ if receiver1 in socks and socks[receiver1] == zmq.POLLIN:
+ print "Message from receiver1: %s" % receiver1.recv()
+ msgcnt += 1
+
+ if receiver2 in socks and socks[receiver2] == zmq.POLLIN:
+ print "Message from receiver2: %s" % receiver2.recv()
+ msgcnt += 1
+
+print "%d messages received" % msgcnt
diff --git a/examples/gevent/reqrep.py b/examples/gevent/reqrep.py
new file mode 100644
index 0000000..2a4f307
--- /dev/null
+++ b/examples/gevent/reqrep.py
@@ -0,0 +1,47 @@
+"""
+Complex example which is a combination of the rr* examples from the zguide.
+"""
+from gevent import spawn
+import zmq.green as zmq
+
+# server
+context = zmq.Context()
+socket = context.socket(zmq.REP)
+socket.connect("tcp://localhost:5560")
+
+def serve(socket):
+ while True:
+ message = socket.recv()
+ print "Received request: ", message
+ socket.send("World")
+server = spawn(serve, socket)
+
+
+# client
+context = zmq.Context()
+socket = context.socket(zmq.REQ)
+socket.connect("tcp://localhost:5559")
+
+# Do 10 requests, waiting each time for a response
+def client():
+ for request in range(1,10):
+ socket.send("Hello")
+ message = socket.recv()
+ print "Received reply ", request, "[", message, "]"
+
+
+# broker
+frontend = context.socket(zmq.ROUTER)
+backend = context.socket(zmq.DEALER);
+frontend.bind("tcp://*:5559")
+backend.bind("tcp://*:5560")
+
+def proxy(socket_from, socket_to):
+ while True:
+ m = socket_from.recv_multipart()
+ socket_to.send_multipart(m)
+
+a = spawn(proxy, frontend, backend)
+b = spawn(proxy, backend, frontend)
+
+spawn(client).join()
diff --git a/examples/gevent/simple.py b/examples/gevent/simple.py
new file mode 100644
index 0000000..ae065b3
--- /dev/null
+++ b/examples/gevent/simple.py
@@ -0,0 +1,37 @@
+from gevent import spawn, spawn_later
+import zmq.green as zmq
+
+# server
+print zmq.Context
+ctx = zmq.Context()
+sock = ctx.socket(zmq.PUSH)
+sock.bind('ipc:///tmp/zmqtest')
+
+spawn(sock.send_pyobj, ('this', 'is', 'a', 'python', 'tuple'))
+spawn_later(1, sock.send_pyobj, {'hi': 1234})
+spawn_later(2, sock.send_pyobj, ({'this': ['is a more complicated object', ':)']}, 42, 42, 42))
+spawn_later(3, sock.send_pyobj, 'foobar')
+spawn_later(4, sock.send_pyobj, 'quit')
+
+
+# client
+ctx = zmq.Context() # create a new context to kick the wheels
+sock = ctx.socket(zmq.PULL)
+sock.connect('ipc:///tmp/zmqtest')
+
+def get_objs(sock):
+ while True:
+ o = sock.recv_pyobj()
+ print 'received python object:', o
+ if o == 'quit':
+ print 'exiting.'
+ break
+
+def print_every(s, t=None):
+ print s
+ if t:
+ spawn_later(t, print_every, s, t)
+
+print_every('printing every half second', 0.5)
+spawn(get_objs, sock).join()
+
diff --git a/examples/heartbeat/heart.py b/examples/heartbeat/heart.py
new file mode 100644
index 0000000..175370e
--- /dev/null
+++ b/examples/heartbeat/heart.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python
+"""This launches an echoing rep socket device,
+and runs a blocking numpy action. The rep socket should
+remain responsive to pings during this time. Use heartbeater.py to
+ping this heart, and see the responsiveness.
+
+Authors
+-------
+* MinRK
+"""
+
+import time
+import numpy
+import zmq
+from zmq import devices
+
+ctx = zmq.Context()
+
+dev = devices.ThreadDevice(zmq.FORWARDER, zmq.SUB, zmq.DEALER)
+dev.setsockopt_in(zmq.SUBSCRIBE, "")
+dev.connect_in('tcp://127.0.0.1:5555')
+dev.connect_out('tcp://127.0.0.1:5556')
+dev.start()
+
+#wait for connections
+time.sleep(1)
+
+A = numpy.random.random((2**11,2**11))
+print "starting blocking loop"
+while True:
+ tic = time.time()
+ numpy.dot(A,A.transpose())
+ print "blocked for %.3f s"%(time.time()-tic)
+
diff --git a/examples/heartbeat/heartbeater.py b/examples/heartbeat/heartbeater.py
new file mode 100644
index 0000000..180828a
--- /dev/null
+++ b/examples/heartbeat/heartbeater.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+"""
+
+For use with heart.py
+
+A basic heartbeater using PUB and ROUTER sockets. pings are sent out on the PUB, and hearts
+are tracked based on their DEALER identities.
+
+You can start many hearts with heart.py, and the heartbeater will monitor all of them, and notice when they stop responding.
+
+Authors
+-------
+* MinRK
+"""
+
+import time
+import zmq
+from zmq.eventloop import ioloop, zmqstream
+
+
+class HeartBeater(object):
+ """A basic HeartBeater class
+ pingstream: a PUB stream
+ pongstream: an ROUTER stream"""
+
+ def __init__(self, loop, pingstream, pongstream, period=1000):
+ self.loop = loop
+ self.period = period
+
+ self.pingstream = pingstream
+ self.pongstream = pongstream
+ self.pongstream.on_recv(self.handle_pong)
+
+ self.hearts = set()
+ self.responses = set()
+ self.lifetime = 0
+ self.tic = time.time()
+
+ self.caller = ioloop.PeriodicCallback(self.beat, period, self.loop)
+ self.caller.start()
+
+ def beat(self):
+ toc = time.time()
+ self.lifetime += toc-self.tic
+ self.tic = toc
+ print self.lifetime
+ # self.message = str(self.lifetime)
+ goodhearts = self.hearts.intersection(self.responses)
+ heartfailures = self.hearts.difference(goodhearts)
+ newhearts = self.responses.difference(goodhearts)
+ # print newhearts, goodhearts, heartfailures
+ map(self.handle_new_heart, newhearts)
+ map(self.handle_heart_failure, heartfailures)
+ self.responses = set()
+ print "%i beating hearts: %s"%(len(self.hearts),self.hearts)
+ self.pingstream.send(str(self.lifetime))
+
+ def handle_new_heart(self, heart):
+ print "yay, got new heart %s!"%heart
+ self.hearts.add(heart)
+
+ def handle_heart_failure(self, heart):
+ print "Heart %s failed :("%heart
+ self.hearts.remove(heart)
+
+
+ def handle_pong(self, msg):
+ "if heart is beating"
+ if msg[1] == str(self.lifetime):
+ self.responses.add(msg[0])
+ else:
+ print "got bad heartbeat (possibly old?): %s"%msg[1]
+
+# sub.setsockopt(zmq.SUBSCRIBE)
+
+
+if __name__ == '__main__':
+ loop = ioloop.IOLoop()
+ context = zmq.Context()
+ pub = context.socket(zmq.PUB)
+ pub.bind('tcp://127.0.0.1:5555')
+ router = context.socket(zmq.ROUTER)
+ router.bind('tcp://127.0.0.1:5556')
+
+ outstream = zmqstream.ZMQStream(pub, loop)
+ instream = zmqstream.ZMQStream(router, loop)
+
+ hb = HeartBeater(loop, outstream, instream)
+
+ loop.start()
diff --git a/examples/heartbeat/ping.py b/examples/heartbeat/ping.py
new file mode 100644
index 0000000..797cb8c
--- /dev/null
+++ b/examples/heartbeat/ping.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+"""For use with pong.py
+
+This script simply pings a process started by pong.py or tspong.py, to
+demonstrate that zmq remains responsive while Python blocks.
+
+Authors
+-------
+* MinRK
+"""
+from __future__ import print_function
+
+import sys
+import time
+import numpy
+import zmq
+
+ctx = zmq.Context()
+
+req = ctx.socket(zmq.REQ)
+req.connect('tcp://127.0.0.1:10111')
+
+#wait for connects
+time.sleep(1)
+n=0
+while True:
+ time.sleep(numpy.random.random())
+ for i in range(4):
+ n+=1
+ msg = 'ping %i' % n
+ tic = time.time()
+ req.send_string(msg)
+ resp = req.recv_string()
+ print("%s: %.2f ms" % (msg, 1000*(time.time()-tic)))
+ assert msg == resp
diff --git a/examples/heartbeat/pong.py b/examples/heartbeat/pong.py
new file mode 100644
index 0000000..524f394
--- /dev/null
+++ b/examples/heartbeat/pong.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python
+"""This launches an echoing rep socket device using
+zmq.devices.ThreadDevice, and runs a blocking numpy action.
+The rep socket should remain responsive to pings during this time.
+
+Use ping.py to see how responsive it is.
+
+Authors
+-------
+* MinRK
+"""
+from __future__ import print_function
+
+import time
+import numpy
+import zmq
+from zmq import devices
+
+ctx = zmq.Context()
+
+dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
+dev.bind_in('tcp://127.0.0.1:10111')
+dev.setsockopt_in(zmq.IDENTITY, b"whoda")
+dev.start()
+
+#wait for connections
+time.sleep(1)
+
+A = numpy.random.random((2**11,2**12))
+print("starting blocking loop")
+while True:
+ tic = time.time()
+ numpy.dot(A,A.transpose())
+ print("blocked for %.3f s"%(time.time()-tic))
diff --git a/examples/logger/zmqlogger.py b/examples/logger/zmqlogger.py
new file mode 100644
index 0000000..c55b51b
--- /dev/null
+++ b/examples/logger/zmqlogger.py
@@ -0,0 +1,70 @@
+"""
+Simple example of using zmq log handlers
+
+This starts a number of subprocesses with PUBHandlers that generate
+log messages at a regular interval. The main process has a SUB socket,
+which aggregates and logs all of the messages to the root logger.
+"""
+
+import logging
+from multiprocessing import Process
+import os
+import random
+import sys
+import time
+
+import zmq
+from zmq.log.handlers import PUBHandler
+
+LOG_LEVELS = (logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR, logging.CRITICAL)
+
+def sub_logger(port, level=logging.DEBUG):
+ ctx = zmq.Context()
+ sub = ctx.socket(zmq.SUB)
+ sub.bind('tcp://127.0.0.1:%i' % port)
+ sub.setsockopt(zmq.SUBSCRIBE, "")
+ logging.basicConfig(level=level)
+
+ while True:
+ level, message = sub.recv_multipart()
+ if message.endswith('\n'):
+ # trim trailing newline, which will get appended again
+ message = message[:-1]
+ log = getattr(logging, level.lower())
+ log(message)
+
+def log_worker(port, interval=1, level=logging.DEBUG):
+ ctx = zmq.Context()
+ pub = ctx.socket(zmq.PUB)
+ pub.connect('tcp://127.0.0.1:%i' % port)
+
+ logger = logging.getLogger(str(os.getpid()))
+ logger.setLevel(level)
+ handler = PUBHandler(pub)
+ logger.addHandler(handler)
+ print "starting logger at %i with level=%s" % (os.getpid(), level)
+
+ while True:
+ level = random.choice(LOG_LEVELS)
+ logger.log(level, "Hello from %i!" % os.getpid())
+ time.sleep(interval)
+
+if __name__ == '__main__':
+ if len(sys.argv) > 1:
+ n = int(sys.argv[1])
+ else:
+ n = 2
+
+ port = 5555
+
+ # start the log generators
+ workers = [ Process(target=log_worker, args=(port,), kwargs=dict(level=random.choice(LOG_LEVELS))) for i in range(n) ]
+ [ w.start() for w in workers ]
+
+ # start the log watcher
+ try:
+ sub_logger(port)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ [ w.terminate() for w in workers ]
diff --git a/examples/mongodb/client.py b/examples/mongodb/client.py
new file mode 100644
index 0000000..839dce7
--- /dev/null
+++ b/examples/mongodb/client.py
@@ -0,0 +1,46 @@
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Justin Riley
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import json
+import zmq
+
+class MongoZMQClient(object):
+ """
+ Client that connects with MongoZMQ server to add/fetch docs
+ """
+
+ def __init__(self, connect_addr='tcp://127.0.0.1:5000'):
+ self._context = zmq.Context()
+ self._socket = self._context.socket(zmq.DEALER)
+ self._socket.connect(connect_addr)
+
+ def _send_recv_msg(self, msg):
+ self._socket.send_multipart(msg)
+ return self._socket.recv_multipart()[0]
+
+ def get_doc(self, keys):
+ msg = ['get', json.dumps(keys)]
+ json_str = self._send_recv_msg(msg)
+ return json.loads(json_str)
+
+ def add_doc(self, doc):
+ msg = ['add', json.dumps(doc)]
+ return self._send_recv_msg(msg)
+
+def main():
+ client = MongoZMQClient()
+ for i in range(10):
+ doc = {'job': str(i)}
+ print "Adding doc", doc
+ print client.add_doc(doc)
+ for i in range(10):
+ query = {'job': str(i)}
+ print "Getting doc matching query:", query
+ print client.get_doc(query)
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/mongodb/controller.py b/examples/mongodb/controller.py
new file mode 100644
index 0000000..e154f1c
--- /dev/null
+++ b/examples/mongodb/controller.py
@@ -0,0 +1,91 @@
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Justin Riley
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import sys
+import zmq
+import pymongo
+import pymongo.json_util
+import json
+
+class MongoZMQ(object):
+ """
+ ZMQ server that adds/fetches documents (ie dictionaries) to a MongoDB.
+
+ NOTE: mongod must be started before using this class
+ """
+
+ def __init__(self, db_name, table_name, bind_addr="tcp://127.0.0.1:5000"):
+ """
+ bind_addr: address to bind zmq socket on
+ db_name: name of database to write to (created if doesnt exist)
+ table_name: name of mongodb 'table' in the db to write to (created if doesnt exist)
+ """
+ self._bind_addr = bind_addr
+ self._db_name = db_name
+ self._table_name = table_name
+ self._conn = pymongo.Connection()
+ self._db = self._conn[self._db_name]
+ self._table = self._db[self._table_name]
+
+ def _doc_to_json(self, doc):
+ return json.dumps(doc,default=pymongo.json_util.default)
+
+ def add_document(self, doc):
+ """
+ Inserts a document (dictionary) into mongo database table
+ """
+ print 'adding docment %s' % (doc)
+ try:
+ self._table.insert(doc)
+ except Exception,e:
+ return 'Error: %s' % e
+
+ def get_document_by_keys(self, keys):
+ """
+ Attempts to return a single document from database table that matches
+ each key/value in keys dictionary.
+ """
+ print 'attempting to retrieve document using keys: %s' % keys
+ try:
+ return self._table.find_one(keys)
+ except Exception,e:
+ return 'Error: %s' % e
+
+ def start(self):
+ context = zmq.Context()
+ socket = context.socket(zmq.ROUTER)
+ socket.bind(self._bind_addr)
+ while True:
+ msg = socket.recv_multipart()
+ print "Received msg: ", msg
+ if len(msg) != 3:
+ error_msg = 'invalid message received: %s' % msg
+ print error_msg
+ reply = [msg[0], error_msg]
+ socket.send_multipart(reply)
+ continue
+ id = msg[0]
+ operation = msg[1]
+ contents = json.loads(msg[2])
+ # always send back the id with ROUTER
+ reply = [id]
+ if operation == 'add':
+ self.add_document(contents)
+ reply.append("success")
+ elif operation == 'get':
+ doc = self.get_document_by_keys(contents)
+ json_doc = self._doc_to_json(doc)
+ reply.append(json_doc)
+ else:
+ print 'unknown request'
+ socket.send_multipart(reply)
+
+def main():
+ MongoZMQ('ipcontroller','jobs').start()
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/monitoring/simple_monitor.py b/examples/monitoring/simple_monitor.py
new file mode 100644
index 0000000..6978adc
--- /dev/null
+++ b/examples/monitoring/simple_monitor.py
@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+"""Simple example demonstrating the use of the socket monitoring feature."""
+
+# This file is part of pyzmq.
+#
+# Distributed under the terms of the New BSD License. The full
+# license is in the file COPYING.BSD, distributed as part of this
+# software.
+from __future__ import print_function
+
+__author__ = 'Guido Goldstein'
+
+import threading
+import time
+
+import zmq
+from zmq.utils.monitor import recv_monitor_message
+
+
+line = lambda: print('-' * 40)
+
+
+print("libzmq-%s" % zmq.zmq_version())
+if zmq.zmq_version_info() < (4, 0):
+ raise RuntimeError("monitoring in libzmq version < 4.0 is not supported")
+
+EVENT_MAP = {}
+print("Event names:")
+for name in dir(zmq):
+ if name.startswith('EVENT_'):
+ value = getattr(zmq, name)
+ print("%21s : %4i" % (name, value))
+ EVENT_MAP[value] = name
+
+
+def event_monitor(monitor):
+ while monitor.poll():
+ evt = recv_monitor_message(monitor)
+ evt.update({'description': EVENT_MAP[evt['event']]})
+ print("Event: {}".format(evt))
+ if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
+ break
+ monitor.close()
+ print()
+ print("event monitor thread done!")
+
+
+ctx = zmq.Context().instance()
+rep = ctx.socket(zmq.REP)
+req = ctx.socket(zmq.REQ)
+
+monitor = req.get_monitor_socket()
+
+t = threading.Thread(target=event_monitor, args=(monitor,))
+t.start()
+
+line()
+print("bind req")
+req.bind("tcp://127.0.0.1:6666")
+req.bind("tcp://127.0.0.1:6667")
+time.sleep(1)
+
+line()
+print("connect rep")
+rep.connect("tcp://127.0.0.1:6667")
+time.sleep(0.2)
+rep.connect("tcp://127.0.0.1:6666")
+time.sleep(1)
+
+line()
+print("disconnect rep")
+rep.disconnect("tcp://127.0.0.1:6667")
+time.sleep(1)
+rep.disconnect("tcp://127.0.0.1:6666")
+time.sleep(1)
+
+line()
+print("close rep")
+rep.close()
+time.sleep(1)
+
+line()
+print("disabling event monitor")
+req.disable_monitor()
+
+line()
+print("event monitor thread should now terminate")
+
+# Create a new socket to connect to listener, no more
+# events should be observed.
+rep = ctx.socket(zmq.REP)
+
+line()
+print("connect rep")
+rep.connect("tcp://127.0.0.1:6667")
+time.sleep(0.2)
+
+line()
+print("disconnect rep")
+rep.disconnect("tcp://127.0.0.1:6667")
+time.sleep(0.2)
+
+line()
+print("close rep")
+rep.close()
+
+line()
+print("close req")
+req.close()
+
+print("END")
+ctx.term()
diff --git a/examples/poll/pair.py b/examples/poll/pair.py
new file mode 100644
index 0000000..81c8b3a
--- /dev/null
+++ b/examples/poll/pair.py
@@ -0,0 +1,56 @@
+"""A thorough test of polling PAIR sockets."""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Brian Granger
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import time
+import zmq
+
+print "Running polling tests for PAIR sockets..."
+
+addr = 'tcp://127.0.0.1:5555'
+ctx = zmq.Context()
+s1 = ctx.socket(zmq.PAIR)
+s2 = ctx.socket(zmq.PAIR)
+
+s1.bind(addr)
+s2.connect(addr)
+
+# Sleep to allow sockets to connect.
+time.sleep(1.0)
+
+poller = zmq.Poller()
+poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
+poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
+
+# Now make sure that both are send ready.
+socks = dict(poller.poll())
+assert socks[s1] == zmq.POLLOUT
+assert socks[s2] == zmq.POLLOUT
+
+# Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
+s1.send('msg1')
+s2.send('msg2')
+time.sleep(1.0)
+socks = dict(poller.poll())
+assert socks[s1] == zmq.POLLOUT|zmq.POLLIN
+assert socks[s2] == zmq.POLLOUT|zmq.POLLIN
+
+# Make sure that both are in POLLOUT after recv.
+s1.recv()
+s2.recv()
+socks = dict(poller.poll())
+assert socks[s1] == zmq.POLLOUT
+assert socks[s2] == zmq.POLLOUT
+
+poller.unregister(s1)
+poller.unregister(s2)
+
+# Wait for everything to finish.
+time.sleep(1.0)
+
+print "Finished." \ No newline at end of file
diff --git a/examples/poll/pubsub.py b/examples/poll/pubsub.py
new file mode 100644
index 0000000..a590fa9
--- /dev/null
+++ b/examples/poll/pubsub.py
@@ -0,0 +1,57 @@
+"""A thorough test of polling PUB/SUB sockets."""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Brian Granger
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import time
+import zmq
+
+print "Running polling tets for PUB/SUB sockets..."
+
+addr = 'tcp://127.0.0.1:5555'
+ctx = zmq.Context()
+s1 = ctx.socket(zmq.PUB)
+s2 = ctx.socket(zmq.SUB)
+s2.setsockopt(zmq.SUBSCRIBE, '')
+
+s1.bind(addr)
+s2.connect(addr)
+
+# Sleep to allow sockets to connect.
+time.sleep(1.0)
+
+poller = zmq.Poller()
+poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
+poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
+
+# Now make sure that both are send ready.
+socks = dict(poller.poll())
+assert socks[s1] == zmq.POLLOUT
+assert not socks.has_key(s2)
+
+# Make sure that s1 stays in POLLOUT after a send.
+s1.send('msg1')
+socks = dict(poller.poll())
+assert socks[s1] == zmq.POLLOUT
+
+# Make sure that s2 is POLLIN after waiting.
+time.sleep(0.5)
+socks = dict(poller.poll())
+assert socks[s2] == zmq.POLLIN
+
+# Make sure that s2 goes into 0 after recv.
+s2.recv()
+socks = dict(poller.poll())
+assert not socks.has_key(s2)
+
+poller.unregister(s1)
+poller.unregister(s2)
+
+# Wait for everything to finish.
+time.sleep(1.0)
+
+print "Finished."
diff --git a/examples/poll/reqrep.py b/examples/poll/reqrep.py
new file mode 100644
index 0000000..ef4436c
--- /dev/null
+++ b/examples/poll/reqrep.py
@@ -0,0 +1,71 @@
+"""A thorough test of polling REQ/REP sockets."""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Brian Granger
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import time
+import zmq
+
+print "Running polling tests for REQ/REP sockets..."
+
+addr = 'tcp://127.0.0.1:5555'
+ctx = zmq.Context()
+s1 = ctx.socket(zmq.REP)
+s2 = ctx.socket(zmq.REQ)
+
+s1.bind(addr)
+s2.connect(addr)
+
+# Sleep to allow sockets to connect.
+time.sleep(1.0)
+
+poller = zmq.Poller()
+poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
+poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
+
+# Make sure that s1 is in state 0 and s2 is in POLLOUT
+socks = dict(poller.poll())
+assert not socks.has_key(s1)
+assert socks[s2] == zmq.POLLOUT
+
+# Make sure that s2 goes immediately into state 0 after send.
+s2.send('msg1')
+socks = dict(poller.poll())
+assert not socks.has_key(s2)
+
+# Make sure that s1 goes into POLLIN state after a time.sleep().
+time.sleep(0.5)
+socks = dict(poller.poll())
+assert socks[s1] == zmq.POLLIN
+
+# Make sure that s1 goes into POLLOUT after recv.
+s1.recv()
+socks = dict(poller.poll())
+assert socks[s1] == zmq.POLLOUT
+
+# Make sure s1 goes into state 0 after send.
+s1.send('msg2')
+socks = dict(poller.poll())
+assert not socks.has_key(s1)
+
+# Wait and then see that s2 is in POLLIN.
+time.sleep(0.5)
+socks = dict(poller.poll())
+assert socks[s2] == zmq.POLLIN
+
+# Make sure that s2 is in POLLOUT after recv.
+s2.recv()
+socks = dict(poller.poll())
+assert socks[s2] == zmq.POLLOUT
+
+poller.unregister(s1)
+poller.unregister(s2)
+
+# Wait for everything to finish.
+time.sleep(1.0)
+
+print "Finished."
diff --git a/examples/pubsub/publisher.py b/examples/pubsub/publisher.py
new file mode 100644
index 0000000..a2ce6c9
--- /dev/null
+++ b/examples/pubsub/publisher.py
@@ -0,0 +1,57 @@
+"""A test that publishes NumPy arrays.
+
+Uses REQ/REP (on PUB/SUB socket + 1) to synchronize
+"""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Brian Granger
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import sys
+import time
+
+import zmq
+import numpy
+
+def sync(bind_to):
+ # use bind socket + 1
+ sync_with = ':'.join(bind_to.split(':')[:-1] +
+ [str(int(bind_to.split(':')[-1]) + 1)])
+ ctx = zmq.Context.instance()
+ s = ctx.socket(zmq.REP)
+ s.bind(sync_with)
+ print "Waiting for subscriber to connect..."
+ s.recv()
+ print " Done."
+ s.send('GO')
+
+def main():
+ if len (sys.argv) != 4:
+ print 'usage: publisher <bind-to> <array-size> <array-count>'
+ sys.exit (1)
+
+ try:
+ bind_to = sys.argv[1]
+ array_size = int(sys.argv[2])
+ array_count = int (sys.argv[3])
+ except (ValueError, OverflowError), e:
+ print 'array-size and array-count must be integers'
+ sys.exit (1)
+
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.PUB)
+ s.bind(bind_to)
+
+ sync(bind_to)
+
+ print "Sending arrays..."
+ for i in range(array_count):
+ a = numpy.random.rand(array_size, array_size)
+ s.send_pyobj(a)
+ print " Done."
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/pubsub/subscriber.py b/examples/pubsub/subscriber.py
new file mode 100644
index 0000000..b996ad8
--- /dev/null
+++ b/examples/pubsub/subscriber.py
@@ -0,0 +1,74 @@
+"""A test that subscribes to NumPy arrays.
+
+Uses REQ/REP (on PUB/SUB socket + 1) to synchronize
+"""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Brian Granger
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+
+import sys
+import time
+
+import zmq
+import numpy
+
+def sync(connect_to):
+ # use connect socket + 1
+ sync_with = ':'.join(connect_to.split(':')[:-1] +
+ [str(int(connect_to.split(':')[-1]) + 1)]
+ )
+ ctx = zmq.Context.instance()
+ s = ctx.socket(zmq.REQ)
+ s.connect(sync_with)
+ s.send('READY')
+ s.recv()
+
+def main():
+ if len (sys.argv) != 3:
+ print 'usage: subscriber <connect_to> <array-count>'
+ sys.exit (1)
+
+ try:
+ connect_to = sys.argv[1]
+ array_count = int (sys.argv[2])
+ except (ValueError, OverflowError), e:
+ print 'array-count must be integers'
+ sys.exit (1)
+
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.SUB)
+ s.connect(connect_to)
+ s.setsockopt(zmq.SUBSCRIBE,'')
+
+ sync(connect_to)
+
+ start = time.clock()
+
+ print "Receiving arrays..."
+ for i in range(array_count):
+ a = s.recv_pyobj()
+ print " Done."
+
+ end = time.clock()
+
+ elapsed = (end - start) * 1000000
+ if elapsed == 0:
+ elapsed = 1
+ throughput = (1000000.0 * float (array_count)) / float (elapsed)
+ message_size = a.nbytes
+ megabits = float (throughput * message_size * 8) / 1000000
+
+ print "message size: %.0f [B]" % (message_size, )
+ print "array count: %.0f" % (array_count, )
+ print "mean throughput: %.0f [msg/s]" % (throughput, )
+ print "mean throughput: %.3f [Mb/s]" % (megabits, )
+
+ time.sleep(1.0)
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/pubsub/topics_pub.py b/examples/pubsub/topics_pub.py
new file mode 100755
index 0000000..73b3d1c
--- /dev/null
+++ b/examples/pubsub/topics_pub.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+"""Simple example of publish/subscribe illustrating topics.
+
+Publisher and subscriber can be started in any order, though if publisher
+starts first, any messages sent before subscriber starts are lost. More than
+one subscriber can listen, and they can listen to different topics.
+
+Topic filtering is done simply on the start of the string, e.g. listening to
+'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to
+catch 'weather'.
+"""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Brian Granger
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import itertools
+import sys
+import time
+
+import zmq
+
+def main():
+ if len (sys.argv) != 2:
+ print 'usage: publisher <bind-to>'
+ sys.exit (1)
+
+ bind_to = sys.argv[1]
+
+ all_topics = ['sports.general','sports.football','sports.basketball',
+ 'stocks.general','stocks.GOOG','stocks.AAPL',
+ 'weather']
+
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.PUB)
+ s.bind(bind_to)
+
+ print "Starting broadcast on topics:"
+ print " %s" % all_topics
+ print "Hit Ctrl-C to stop broadcasting."
+ print "Waiting so subscriber sockets can connect..."
+ print
+ time.sleep(1.0)
+
+ msg_counter = itertools.count()
+ try:
+ for topic in itertools.cycle(all_topics):
+ msg_body = str(msg_counter.next())
+ print ' Topic: %s, msg:%s' % (topic, msg_body)
+ s.send_multipart([topic, msg_body])
+ # short wait so we don't hog the cpu
+ time.sleep(0.1)
+ except KeyboardInterrupt:
+ pass
+
+ print "Waiting for message queues to flush..."
+ time.sleep(0.5)
+ print "Done."
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/pubsub/topics_sub.py b/examples/pubsub/topics_sub.py
new file mode 100755
index 0000000..4a61fb5
--- /dev/null
+++ b/examples/pubsub/topics_sub.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+"""Simple example of publish/subscribe illustrating topics.
+
+Publisher and subscriber can be started in any order, though if publisher
+starts first, any messages sent before subscriber starts are lost. More than
+one subscriber can listen, and they can listen to different topics.
+
+Topic filtering is done simply on the start of the string, e.g. listening to
+'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to
+catch 'weather'.
+"""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010 Brian Granger, Fernando Perez
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import sys
+import time
+
+import zmq
+import numpy
+
+def main():
+ if len (sys.argv) < 2:
+ print 'usage: subscriber <connect_to> [topic topic ...]'
+ sys.exit (1)
+
+ connect_to = sys.argv[1]
+ topics = sys.argv[2:]
+
+ ctx = zmq.Context()
+ s = ctx.socket(zmq.SUB)
+ s.connect(connect_to)
+
+ # manage subscriptions
+ if not topics:
+ print "Receiving messages on ALL topics..."
+ s.setsockopt(zmq.SUBSCRIBE,'')
+ else:
+ print "Receiving messages on topics: %s ..." % topics
+ for t in topics:
+ s.setsockopt(zmq.SUBSCRIBE,t)
+ print
+ try:
+ while True:
+ topic, msg = s.recv_multipart()
+ print ' Topic: %s, msg:%s' % (topic, msg)
+ except KeyboardInterrupt:
+ pass
+ print "Done."
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/security/generate_certificates.py b/examples/security/generate_certificates.py
new file mode 100644
index 0000000..80db258
--- /dev/null
+++ b/examples/security/generate_certificates.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python
+
+"""
+Generate client and server CURVE certificate files then move them into the
+appropriate store directory, private_keys or public_keys. The certificates
+generated by this script are used by the stonehouse and ironhouse examples.
+
+In practice this would be done by hand or some out-of-band process.
+
+Author: Chris Laws
+"""
+
+import os
+import shutil
+import zmq.auth
+
+def generate_certificates(base_dir):
+ ''' Generate client and server CURVE certificate files'''
+ keys_dir = os.path.join(base_dir, 'certificates')
+ public_keys_dir = os.path.join(base_dir, 'public_keys')
+ secret_keys_dir = os.path.join(base_dir, 'private_keys')
+
+ # Create directories for certificates, remove old content if necessary
+ for d in [keys_dir, public_keys_dir, secret_keys_dir]:
+ if os.path.exists(d):
+ shutil.rmtree(d)
+ os.mkdir(d)
+
+ # create new keys in certificates dir
+ server_public_file, server_secret_file = zmq.auth.create_certificates(keys_dir, "server")
+ client_public_file, client_secret_file = zmq.auth.create_certificates(keys_dir, "client")
+
+ # move public keys to appropriate directory
+ for key_file in os.listdir(keys_dir):
+ if key_file.endswith(".key"):
+ shutil.move(os.path.join(keys_dir, key_file),
+ os.path.join(public_keys_dir, '.'))
+
+ # move secret keys to appropriate directory
+ for key_file in os.listdir(keys_dir):
+ if key_file.endswith(".key_secret"):
+ shutil.move(os.path.join(keys_dir, key_file),
+ os.path.join(secret_keys_dir, '.'))
+
+if __name__ == '__main__':
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
+
+ generate_certificates(os.path.dirname(__file__))
diff --git a/examples/security/grasslands.py b/examples/security/grasslands.py
new file mode 100644
index 0000000..cbd3ab9
--- /dev/null
+++ b/examples/security/grasslands.py
@@ -0,0 +1,29 @@
+#!/usr/bin/env python
+
+'''
+No protection at all.
+
+All connections are accepted, there is no authentication, and no privacy.
+
+This is how ZeroMQ always worked until we built security into the wire
+protocol in early 2013. Internally, it uses a security mechanism called
+"NULL".
+
+Author: Chris Laws
+'''
+
+import zmq
+
+
+ctx = zmq.Context().instance()
+
+server = ctx.socket(zmq.PUSH)
+server.bind('tcp://*:9000')
+
+client = ctx.socket(zmq.PULL)
+client.connect('tcp://127.0.0.1:9000')
+
+server.send(b"Hello")
+msg = client.recv()
+if msg == b"Hello":
+ print("Grasslands test OK")
diff --git a/examples/security/ioloop-ironhouse.py b/examples/security/ioloop-ironhouse.py
new file mode 100644
index 0000000..fbde306
--- /dev/null
+++ b/examples/security/ioloop-ironhouse.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+
+'''
+Ironhouse extends Stonehouse with client public key authentication.
+
+This is the strongest security model we have today, protecting against every
+attack we know about, except end-point attacks (where an attacker plants
+spyware on a machine to capture data before it's encrypted, or after it's
+decrypted).
+
+This example demonstrates using the IOLoopAuthenticator.
+
+Author: Chris Laws
+'''
+
+import logging
+import os
+import sys
+
+import zmq
+import zmq.auth
+from zmq.auth.ioloop import IOLoopAuthenticator
+from zmq.eventloop import ioloop, zmqstream
+
+def echo(server, msg):
+ logging.debug("server recvd %s", msg)
+ reply = msg + [b'World']
+ logging.debug("server sending %s", reply)
+ server.send_multipart(reply)
+
+def setup_server(server_secret_file, endpoint='tcp://127.0.0.1:9000'):
+ """setup a simple echo server with CURVE auth"""
+ server = zmq.Context.instance().socket(zmq.ROUTER)
+
+ server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
+ server.curve_secretkey = server_secret
+ server.curve_publickey = server_public
+ server.curve_server = True # must come before bind
+ server.bind(endpoint)
+
+ server_stream = zmqstream.ZMQStream(server)
+ # simple echo
+ server_stream.on_recv_stream(echo)
+ return server_stream
+
+def client_msg_recvd(msg):
+ logging.debug("client recvd %s", msg)
+ logging.info("Ironhouse test OK")
+ # stop the loop when we get the reply
+ ioloop.IOLoop.instance().stop()
+
+def setup_client(client_secret_file, server_public_file, endpoint='tcp://127.0.0.1:9000'):
+ """setup a simple client with CURVE auth"""
+
+ client = zmq.Context.instance().socket(zmq.DEALER)
+
+ # We need two certificates, one for the client and one for
+ # the server. The client must know the server's public key
+ # to make a CURVE connection.
+ client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
+ client.curve_secretkey = client_secret
+ client.curve_publickey = client_public
+
+ server_public, _ = zmq.auth.load_certificate(server_public_file)
+ # The client must know the server's public key to make a CURVE connection.
+ client.curve_serverkey = server_public
+ client.connect(endpoint)
+
+ client_stream = zmqstream.ZMQStream(client)
+ client_stream.on_recv(client_msg_recvd)
+ return client_stream
+
+
+def run():
+ '''Run Ironhouse example'''
+
+ # These direcotries are generated by the generate_certificates script
+ base_dir = os.path.dirname(__file__)
+ keys_dir = os.path.join(base_dir, 'certificates')
+ public_keys_dir = os.path.join(base_dir, 'public_keys')
+ secret_keys_dir = os.path.join(base_dir, 'private_keys')
+
+ if not (os.path.exists(keys_dir) and
+ os.path.exists(public_keys_dir) and
+ os.path.exists(secret_keys_dir)):
+ logging.critical("Certificates are missing - run generate_certificates script first")
+ sys.exit(1)
+
+ # Start an authenticator for this context.
+ auth = IOLoopAuthenticator()
+ auth.allow('127.0.0.1')
+ # Tell authenticator to use the certificate in a directory
+ auth.configure_curve(domain='*', location=public_keys_dir)
+
+ server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
+ server = setup_server(server_secret_file)
+ server_public_file = os.path.join(public_keys_dir, "server.key")
+ client_secret_file = os.path.join(secret_keys_dir, "client.key_secret")
+ client = setup_client(client_secret_file, server_public_file)
+ client.send(b'Hello')
+
+ auth.start()
+ ioloop.IOLoop.instance().start()
+
+if __name__ == '__main__':
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
+
+ if '-v' in sys.argv:
+ level = logging.DEBUG
+ else:
+ level = logging.INFO
+
+ logging.basicConfig(level=level, format="[%(levelname)s] %(message)s")
+
+ run()
diff --git a/examples/security/ironhouse.py b/examples/security/ironhouse.py
new file mode 100644
index 0000000..dc56e41
--- /dev/null
+++ b/examples/security/ironhouse.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+
+'''
+Ironhouse extends Stonehouse with client public key authentication.
+
+This is the strongest security model we have today, protecting against every
+attack we know about, except end-point attacks (where an attacker plants
+spyware on a machine to capture data before it's encrypted, or after it's
+decrypted).
+
+Author: Chris Laws
+'''
+
+import logging
+import os
+import sys
+
+import zmq
+import zmq.auth
+from zmq.auth.thread import ThreadAuthenticator
+
+
+def run():
+ ''' Run Ironhouse example '''
+
+ # These direcotries are generated by the generate_certificates script
+ base_dir = os.path.dirname(__file__)
+ keys_dir = os.path.join(base_dir, 'certificates')
+ public_keys_dir = os.path.join(base_dir, 'public_keys')
+ secret_keys_dir = os.path.join(base_dir, 'private_keys')
+
+ if not (os.path.exists(keys_dir) and
+ os.path.exists(public_keys_dir) and
+ os.path.exists(secret_keys_dir)):
+ logging.critical("Certificates are missing - run generate_certificates.py script first")
+ sys.exit(1)
+
+ ctx = zmq.Context().instance()
+
+ # Start an authenticator for this context.
+ auth = ThreadAuthenticator(ctx)
+ auth.start()
+ auth.allow('127.0.0.1')
+ # Tell authenticator to use the certificate in a directory
+ auth.configure_curve(domain='*', location=public_keys_dir)
+
+ server = ctx.socket(zmq.PUSH)
+
+ server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
+ server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
+ server.curve_secretkey = server_secret
+ server.curve_publickey = server_public
+ server.curve_server = True # must come before bind
+ server.bind('tcp://*:9000')
+
+ client = ctx.socket(zmq.PULL)
+
+ # We need two certificates, one for the client and one for
+ # the server. The client must know the server's public key
+ # to make a CURVE connection.
+ client_secret_file = os.path.join(secret_keys_dir, "client.key_secret")
+ client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
+ client.curve_secretkey = client_secret
+ client.curve_publickey = client_public
+
+ server_public_file = os.path.join(public_keys_dir, "server.key")
+ server_public, _ = zmq.auth.load_certificate(server_public_file)
+ # The client must know the server's public key to make a CURVE connection.
+ client.curve_serverkey = server_public
+ client.connect('tcp://127.0.0.1:9000')
+
+ server.send(b"Hello")
+
+ if client.poll(1000):
+ msg = client.recv()
+ if msg == b"Hello":
+ logging.info("Ironhouse test OK")
+ else:
+ logging.error("Ironhouse test FAIL")
+
+ # stop auth thread
+ auth.stop()
+
+if __name__ == '__main__':
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
+
+ if '-v' in sys.argv:
+ level = logging.DEBUG
+ else:
+ level = logging.INFO
+
+ logging.basicConfig(level=level, format="[%(levelname)s] %(message)s")
+
+ run()
diff --git a/examples/security/stonehouse.py b/examples/security/stonehouse.py
new file mode 100644
index 0000000..679dea8
--- /dev/null
+++ b/examples/security/stonehouse.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+
+'''
+Stonehouse uses the "CURVE" security mechanism.
+
+This gives us strong encryption on data, and (as far as we know) unbreakable
+authentication. Stonehouse is the minimum you would use over public networks,
+and assures clients that they are speaking to an authentic server, while
+allowing any client to connect.
+
+Author: Chris Laws
+'''
+
+import logging
+import os
+import sys
+import time
+
+import zmq
+import zmq.auth
+from zmq.auth.thread import ThreadAuthenticator
+
+
+def run():
+ ''' Run Stonehouse example '''
+
+ # These directories are generated by the generate_certificates script
+ base_dir = os.path.dirname(__file__)
+ keys_dir = os.path.join(base_dir, 'certificates')
+ public_keys_dir = os.path.join(base_dir, 'public_keys')
+ secret_keys_dir = os.path.join(base_dir, 'private_keys')
+
+ if not (os.path.exists(keys_dir) and
+ os.path.exists(public_keys_dir) and
+ os.path.exists(secret_keys_dir)):
+ logging.critical("Certificates are missing: run generate_certificates.py script first")
+ sys.exit(1)
+
+ ctx = zmq.Context().instance()
+
+ # Start an authenticator for this context.
+ auth = ThreadAuthenticator(ctx)
+ auth.start()
+ auth.allow('127.0.0.1')
+ # Tell the authenticator how to handle CURVE requests
+ auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
+
+ server = ctx.socket(zmq.PUSH)
+ server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
+ server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
+ server.curve_secretkey = server_secret
+ server.curve_publickey = server_public
+ server.curve_server = True # must come before bind
+ server.bind('tcp://*:9000')
+
+ client = ctx.socket(zmq.PULL)
+ # We need two certificates, one for the client and one for
+ # the server. The client must know the server's public key
+ # to make a CURVE connection.
+ client_secret_file = os.path.join(secret_keys_dir, "client.key_secret")
+ client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
+ client.curve_secretkey = client_secret
+ client.curve_publickey = client_public
+
+ # The client must know the server's public key to make a CURVE connection.
+ server_public_file = os.path.join(public_keys_dir, "server.key")
+ server_public, _ = zmq.auth.load_certificate(server_public_file)
+ client.curve_serverkey = server_public
+
+ client.connect('tcp://127.0.0.1:9000')
+
+ server.send(b"Hello")
+
+ if client.poll(1000):
+ msg = client.recv()
+ if msg == b"Hello":
+ logging.info("Stonehouse test OK")
+ else:
+ logging.error("Stonehouse test FAIL")
+
+ # stop auth thread
+ auth.stop()
+
+if __name__ == '__main__':
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
+
+ if '-v' in sys.argv:
+ level = logging.DEBUG
+ else:
+ level = logging.INFO
+
+ logging.basicConfig(level=level, format="[%(levelname)s] %(message)s")
+
+ run()
diff --git a/examples/security/strawhouse.py b/examples/security/strawhouse.py
new file mode 100644
index 0000000..dc75bd7
--- /dev/null
+++ b/examples/security/strawhouse.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python
+
+'''
+Allow or deny clients based on IP address.
+
+Strawhouse, which is plain text with filtering on IP addresses. It still
+uses the NULL mechanism, but we install an authentication hook that checks
+the IP address against a whitelist or blacklist and allows or denies it
+accordingly.
+
+Author: Chris Laws
+'''
+
+import logging
+import sys
+
+import zmq
+import zmq.auth
+from zmq.auth.thread import ThreadAuthenticator
+
+
+def run():
+ '''Run strawhouse client'''
+
+ allow_test_pass = False
+ deny_test_pass = False
+
+ ctx = zmq.Context().instance()
+
+ # Start an authenticator for this context.
+ auth = ThreadAuthenticator(ctx)
+ auth.start()
+
+ # Part 1 - demonstrate allowing clients based on IP address
+ auth.allow('127.0.0.1')
+
+ server = ctx.socket(zmq.PUSH)
+ server.zap_domain = b'global' # must come before bind
+ server.bind('tcp://*:9000')
+
+ client_allow = ctx.socket(zmq.PULL)
+ client_allow.connect('tcp://127.0.0.1:9000')
+
+ server.send(b"Hello")
+
+ msg = client_allow.recv()
+ if msg == b"Hello":
+ allow_test_pass = True
+
+ client_allow.close()
+
+ # Part 2 - demonstrate denying clients based on IP address
+ auth.stop()
+
+ auth = ThreadAuthenticator(ctx)
+ auth.start()
+
+ auth.deny('127.0.0.1')
+
+ client_deny = ctx.socket(zmq.PULL)
+ client_deny.connect('tcp://127.0.0.1:9000')
+
+ if server.poll(50, zmq.POLLOUT):
+ server.send(b"Hello")
+
+ if client_deny.poll(50):
+ msg = client_deny.recv()
+ else:
+ deny_test_pass = True
+ else:
+ deny_test_pass = True
+
+ client_deny.close()
+
+ auth.stop() # stop auth thread
+
+ if allow_test_pass and deny_test_pass:
+ logging.info("Strawhouse test OK")
+ else:
+ logging.error("Strawhouse test FAIL")
+
+
+if __name__ == '__main__':
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
+
+ if '-v' in sys.argv:
+ level = logging.DEBUG
+ else:
+ level = logging.INFO
+
+ logging.basicConfig(level=level, format="[%(levelname)s] %(message)s")
+
+ run()
diff --git a/examples/security/woodhouse.py b/examples/security/woodhouse.py
new file mode 100644
index 0000000..efedee4
--- /dev/null
+++ b/examples/security/woodhouse.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+
+'''
+Woodhouse extends Strawhouse with a name and password check.
+
+This uses the PLAIN mechanism which does plain-text username and password authentication).
+It's not really secure, and anyone sniffing the network (trivial with WiFi)
+can capture passwords and then login.
+
+Author: Chris Laws
+'''
+
+import logging
+import sys
+
+import zmq
+import zmq.auth
+from zmq.auth.thread import ThreadAuthenticator
+
+def run():
+ '''Run woodhouse example'''
+
+ valid_client_test_pass = False
+ invalid_client_test_pass = False
+
+ ctx = zmq.Context().instance()
+
+ # Start an authenticator for this context.
+ auth = ThreadAuthenticator(ctx)
+ auth.start()
+ auth.allow('127.0.0.1')
+ # Instruct authenticator to handle PLAIN requests
+ auth.configure_plain(domain='*', passwords={'admin': 'secret'})
+
+ server = ctx.socket(zmq.PUSH)
+ server.plain_server = True # must come before bind
+ server.bind('tcp://*:9000')
+
+ client = ctx.socket(zmq.PULL)
+ client.plain_username = b'admin'
+ client.plain_password = b'secret'
+ client.connect('tcp://127.0.0.1:9000')
+
+ server.send(b"Hello")
+
+ if client.poll():
+ msg = client.recv()
+ if msg == b"Hello":
+ valid_client_test_pass = True
+
+ client.close()
+
+
+ # now use invalid credentials - expect no msg received
+ client2 = ctx.socket(zmq.PULL)
+ client2.plain_username = b'admin'
+ client2.plain_password = b'bogus'
+ client2.connect('tcp://127.0.0.1:9000')
+
+ server.send(b"World")
+
+ if client2.poll(50):
+ msg = client.recv()
+ if msg == "World":
+ invalid_client_test_pass = False
+ else:
+ # no message is expected
+ invalid_client_test_pass = True
+
+ # stop auth thread
+ auth.stop()
+
+ if valid_client_test_pass and invalid_client_test_pass:
+ logging.info("Woodhouse test OK")
+ else:
+ logging.error("Woodhouse test FAIL")
+
+
+if __name__ == '__main__':
+ if zmq.zmq_version_info() < (4,0):
+ raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version()))
+
+ if '-v' in sys.argv:
+ level = logging.DEBUG
+ else:
+ level = logging.INFO
+
+ logging.basicConfig(level=level, format="[%(levelname)s] %(message)s")
+
+ run()
diff --git a/examples/serialization/serialsocket.py b/examples/serialization/serialsocket.py
new file mode 100644
index 0000000..7329bb9
--- /dev/null
+++ b/examples/serialization/serialsocket.py
@@ -0,0 +1,74 @@
+"""A Socket subclass that adds some serialization methods."""
+
+import zlib
+import pickle
+
+import numpy
+
+import zmq
+
+class SerializingSocket(zmq.Socket):
+ """A class with some extra serialization methods
+
+ send_zipped_pickle is just like send_pyobj, but uses
+ zlib to compress the stream before sending.
+
+ send_array sends numpy arrays with metadata necessary
+ for reconstructing the array on the other side (dtype,shape).
+ """
+
+ def send_zipped_pickle(self, obj, flags=0, protocol=-1):
+ """pack and compress an object with pickle and zlib."""
+ pobj = pickle.dumps(obj, protocol)
+ zobj = zlib.compress(pobj)
+ print('zipped pickle is %i bytes' % len(zobj))
+ return self.send(zobj, flags=flags)
+
+ def recv_zipped_pickle(self, flags=0):
+ """reconstruct a Python object sent with zipped_pickle"""
+ zobj = self.recv(flags)
+ pobj = zlib.decompress(zobj)
+ return pickle.loads(pobj)
+
+ def send_array(self, A, flags=0, copy=True, track=False):
+ """send a numpy array with metadata"""
+ md = dict(
+ dtype = str(A.dtype),
+ shape = A.shape,
+ )
+ self.send_json(md, flags|zmq.SNDMORE)
+ return self.send(A, flags, copy=copy, track=track)
+
+ def recv_array(self, flags=0, copy=True, track=False):
+ """recv a numpy array"""
+ md = self.recv_json(flags=flags)
+ msg = self.recv(flags=flags, copy=copy, track=track)
+ A = numpy.frombuffer(msg, dtype=md['dtype'])
+ return A.reshape(md['shape'])
+
+class SerializingContext(zmq.Context):
+ _socket_class = SerializingSocket
+
+def main():
+ ctx = SerializingContext()
+ req = ctx.socket(zmq.REQ)
+ rep = ctx.socket(zmq.REP)
+
+ rep.bind('inproc://a')
+ req.connect('inproc://a')
+ A = numpy.ones((1024,1024))
+ print ("Array is %i bytes" % (len(A) * 8))
+
+ # send/recv with pickle+zip
+ req.send_zipped_pickle(A)
+ B = rep.recv_zipped_pickle()
+ # now try non-copying version
+ rep.send_array(A, copy=False)
+ C = req.recv_array(copy=False)
+ print ("Checking zipped pickle...")
+ print ("Okay" if (A==B).all() else "Failed")
+ print ("Checking send_array...")
+ print ("Okay" if (C==B).all() else "Failed")
+
+if __name__ == '__main__':
+ main() \ No newline at end of file
diff --git a/examples/win32-interrupt/display.py b/examples/win32-interrupt/display.py
new file mode 100644
index 0000000..25da5d9
--- /dev/null
+++ b/examples/win32-interrupt/display.py
@@ -0,0 +1,45 @@
+"""The display part of a simply two process chat app."""
+
+# This file has been placed in the public domain.
+
+
+import zmq
+from zmq.utils.win32 import allow_interrupt
+
+
+def main(addrs):
+ context = zmq.Context()
+ control = context.socket(zmq.PUB)
+ control.bind('inproc://control')
+ updates = context.socket(zmq.SUB)
+ updates.setsockopt(zmq.SUBSCRIBE, "")
+ updates.connect('inproc://control')
+ for addr in addrs:
+ print "Connecting to: ", addr
+ updates.connect(addr)
+
+ def interrupt_polling():
+ """Fix CTRL-C on Windows using "self pipe trick"."""
+ control.send_multipart(['', 'quit'])
+
+ with allow_interrupt(interrupt_polling):
+ message = ''
+ while message != 'quit':
+ message = updates.recv_multipart()
+ if len(message) < 2:
+ print 'Invalid message.'
+ continue
+ account = message[0]
+ message = ' '.join(message[1:])
+ if message == 'quit':
+ print 'Killed by "%s".' % account
+ break
+ print '%s: %s' % (account, message)
+
+
+if __name__ == '__main__':
+ import sys
+ if len(sys.argv) < 2:
+ print "usage: display.py <address> [,<address>...]"
+ raise SystemExit
+ main(sys.argv[1:])
diff --git a/examples/win32-interrupt/prompt.py b/examples/win32-interrupt/prompt.py
new file mode 100644
index 0000000..96dc5c6
--- /dev/null
+++ b/examples/win32-interrupt/prompt.py
@@ -0,0 +1,39 @@
+"""The prompt part of a simply two process chat app."""
+
+#
+# Copyright (c) 2010 Andrew Gwozdziewycz
+#
+# This file is part of pyzmq.
+#
+# pyzmq is free software; you can redistribute it and/or modify it under
+# the terms of the Lesser GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# pyzmq is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Lesser GNU General Public License for more details.
+#
+# You should have received a copy of the Lesser GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import zmq
+
+def main(addr, account):
+
+ ctx = zmq.Context()
+ socket = ctx.socket(zmq.PUB)
+ socket.bind(addr)
+
+ while True:
+ message = raw_input("%s> " % account)
+ socket.send_multipart((account, message))
+
+
+if __name__ == '__main__':
+ import sys
+ if len(sys.argv) != 3:
+ print "usage: prompt.py <address> <username>"
+ raise SystemExit
+ main(sys.argv[1], sys.argv[2])