From cce638a8adf4e045ca5505afea4bda57753c31dd Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Mon, 11 Aug 2014 16:33:29 -0400 Subject: initial import of debian package --- examples/LICENSE | 3 + examples/README_PY3K | 10 +++ examples/bench/benchmark.py | 25 +++++++ examples/bench/jsonrpc_client.py | 4 + examples/bench/jsonrpc_server.py | 8 ++ examples/bench/latency.png | Bin 0 -> 58452 bytes examples/bench/msgs_sec.png | Bin 0 -> 56500 bytes examples/bench/msgs_sec_log.png | Bin 0 -> 59966 bytes examples/bench/msgs_sec_ratio.png | Bin 0 -> 39876 bytes examples/bench/plot_latency.py | 84 +++++++++++++++++++++ examples/bench/pyro_client.py | 3 + examples/bench/pyro_server.py | 14 ++++ examples/bench/pyzmq_client.py | 16 ++++ examples/bench/pyzmq_server.py | 10 +++ examples/bench/xmlrpc_client.py | 6 ++ examples/bench/xmlrpc_server.py | 8 ++ examples/chat/display.py | 41 +++++++++++ examples/chat/prompt.py | 39 ++++++++++ examples/device/client.py | 38 ++++++++++ examples/device/server.py | 52 +++++++++++++ examples/eventloop/asyncweb.py | 96 ++++++++++++++++++++++++ examples/eventloop/echo.py | 27 +++++++ examples/eventloop/echostream.py | 24 ++++++ examples/eventloop/web.py | 46 ++++++++++++ examples/gevent/poll.py | 42 +++++++++++ examples/gevent/reqrep.py | 47 ++++++++++++ examples/gevent/simple.py | 37 ++++++++++ examples/heartbeat/heart.py | 34 +++++++++ examples/heartbeat/heartbeater.py | 90 +++++++++++++++++++++++ examples/heartbeat/ping.py | 34 +++++++++ examples/heartbeat/pong.py | 34 +++++++++ examples/logger/zmqlogger.py | 70 ++++++++++++++++++ examples/mongodb/client.py | 46 ++++++++++++ examples/mongodb/controller.py | 91 +++++++++++++++++++++++ examples/monitoring/simple_monitor.py | 90 +++++++++++++++++++++++ examples/poll/pair.py | 56 ++++++++++++++ examples/poll/pubsub.py | 57 +++++++++++++++ examples/poll/reqrep.py | 71 ++++++++++++++++++ examples/pubsub/publisher.py | 57 +++++++++++++++ examples/pubsub/subscriber.py | 74 +++++++++++++++++++ examples/pubsub/topics_pub.py | 64 ++++++++++++++++ examples/pubsub/topics_sub.py | 56 ++++++++++++++ examples/security/generate_certificates.py | 49 +++++++++++++ examples/security/grasslands.py | 29 ++++++++ examples/security/ioloop-ironhouse.py | 114 +++++++++++++++++++++++++++++ examples/security/ironhouse.py | 93 +++++++++++++++++++++++ examples/security/stonehouse.py | 93 +++++++++++++++++++++++ examples/security/strawhouse.py | 94 ++++++++++++++++++++++++ examples/security/woodhouse.py | 90 +++++++++++++++++++++++ examples/serialization/serialsocket.py | 74 +++++++++++++++++++ 50 files changed, 2240 insertions(+) create mode 100644 examples/LICENSE create mode 100644 examples/README_PY3K create mode 100644 examples/bench/benchmark.py create mode 100644 examples/bench/jsonrpc_client.py create mode 100644 examples/bench/jsonrpc_server.py create mode 100644 examples/bench/latency.png create mode 100644 examples/bench/msgs_sec.png create mode 100644 examples/bench/msgs_sec_log.png create mode 100644 examples/bench/msgs_sec_ratio.png create mode 100644 examples/bench/plot_latency.py create mode 100644 examples/bench/pyro_client.py create mode 100644 examples/bench/pyro_server.py create mode 100644 examples/bench/pyzmq_client.py create mode 100644 examples/bench/pyzmq_server.py create mode 100644 examples/bench/xmlrpc_client.py create mode 100644 examples/bench/xmlrpc_server.py create mode 100644 examples/chat/display.py create mode 100644 examples/chat/prompt.py create mode 100644 examples/device/client.py create mode 100644 examples/device/server.py create mode 100644 examples/eventloop/asyncweb.py create mode 100644 examples/eventloop/echo.py create mode 100644 examples/eventloop/echostream.py create mode 100644 examples/eventloop/web.py create mode 100644 examples/gevent/poll.py create mode 100644 examples/gevent/reqrep.py create mode 100644 examples/gevent/simple.py create mode 100644 examples/heartbeat/heart.py create mode 100644 examples/heartbeat/heartbeater.py create mode 100644 examples/heartbeat/ping.py create mode 100644 examples/heartbeat/pong.py create mode 100644 examples/logger/zmqlogger.py create mode 100644 examples/mongodb/client.py create mode 100644 examples/mongodb/controller.py create mode 100644 examples/monitoring/simple_monitor.py create mode 100644 examples/poll/pair.py create mode 100644 examples/poll/pubsub.py create mode 100644 examples/poll/reqrep.py create mode 100644 examples/pubsub/publisher.py create mode 100644 examples/pubsub/subscriber.py create mode 100755 examples/pubsub/topics_pub.py create mode 100755 examples/pubsub/topics_sub.py create mode 100644 examples/security/generate_certificates.py create mode 100644 examples/security/grasslands.py create mode 100644 examples/security/ioloop-ironhouse.py create mode 100644 examples/security/ironhouse.py create mode 100644 examples/security/stonehouse.py create mode 100644 examples/security/strawhouse.py create mode 100644 examples/security/woodhouse.py create mode 100644 examples/serialization/serialsocket.py (limited to 'examples') diff --git a/examples/LICENSE b/examples/LICENSE new file mode 100644 index 0000000..d4d3950 --- /dev/null +++ b/examples/LICENSE @@ -0,0 +1,3 @@ +PyZMQ examples are copyright their respective authors, and licensed +under the New BSD License as described in COPYING.BSD unless otherwise +specified in the file. \ No newline at end of file diff --git a/examples/README_PY3K b/examples/README_PY3K new file mode 100644 index 0000000..d5272d0 --- /dev/null +++ b/examples/README_PY3K @@ -0,0 +1,10 @@ +These examples use Python2 syntax. Due to the change in Python from bytestring str objects +to unicode str objects, 2to3 does not perform an adequate transform of the code. Examples +can be valid on both Python2.5 and Python3, but such code is less readable than it should be. + +As a result, the Python3 examples are kept in a separate repo: + +https://github.com/minrk/pyzmq-py3k-examples + + +The differences are very small, but important. \ No newline at end of file diff --git a/examples/bench/benchmark.py b/examples/bench/benchmark.py new file mode 100644 index 0000000..c379af9 --- /dev/null +++ b/examples/bench/benchmark.py @@ -0,0 +1,25 @@ +from timeit import default_timer as timer + +def benchmark(f, size, reps): + msg = size*'0' + t1 = timer() + for i in range(reps): + msg2 = f(msg) + assert msg == msg2 + t2 = timer() + diff = (t2-t1) + latency = diff/reps + return latency*1000000 + +kB = [1000*2**n for n in range(10)] +MB = [1000000*2**n for n in range(8)] +sizes = [1] + kB + MB + +def benchmark_set(f, sizes, reps): + latencies = [] + for size, rep in zip(sizes, reps): + print "Running benchmark with %r reps of %r bytes" % (rep, size) + lat = benchmark(f, size, rep) + latencies.append(lat) + return sizes, latencies + diff --git a/examples/bench/jsonrpc_client.py b/examples/bench/jsonrpc_client.py new file mode 100644 index 0000000..7fb6ef4 --- /dev/null +++ b/examples/bench/jsonrpc_client.py @@ -0,0 +1,4 @@ +from timeit import default_timer as timer +from jsonrpclib import Server + +client = Server('http://localhost:10000') diff --git a/examples/bench/jsonrpc_server.py b/examples/bench/jsonrpc_server.py new file mode 100644 index 0000000..4500a02 --- /dev/null +++ b/examples/bench/jsonrpc_server.py @@ -0,0 +1,8 @@ +from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer + +def echo(x): + return x + +server = SimpleJSONRPCServer(('localhost',10000)) +server.register_function(echo) +server.serve_forever() \ No newline at end of file diff --git a/examples/bench/latency.png b/examples/bench/latency.png new file mode 100644 index 0000000..bb414b5 Binary files /dev/null and b/examples/bench/latency.png differ diff --git a/examples/bench/msgs_sec.png b/examples/bench/msgs_sec.png new file mode 100644 index 0000000..a7b294b Binary files /dev/null and b/examples/bench/msgs_sec.png differ diff --git a/examples/bench/msgs_sec_log.png b/examples/bench/msgs_sec_log.png new file mode 100644 index 0000000..c3a361e Binary files /dev/null and b/examples/bench/msgs_sec_log.png differ diff --git a/examples/bench/msgs_sec_ratio.png b/examples/bench/msgs_sec_ratio.png new file mode 100644 index 0000000..0a87331 Binary files /dev/null and b/examples/bench/msgs_sec_ratio.png differ diff --git a/examples/bench/plot_latency.py b/examples/bench/plot_latency.py new file mode 100644 index 0000000..f50ef29 --- /dev/null +++ b/examples/bench/plot_latency.py @@ -0,0 +1,84 @@ +"""Plot latency data from messaging benchmarks. + +To generate the data for each library, I started the server and then did +the following for each client:: + + from xmlrpc_client import client + for i in range(9): + s = '0'*10**i + print s + %timeit client.echo(s) +""" + +from matplotlib.pylab import * + +rawdata = """# Data in milliseconds +Bytes JSONRPC PYRO XMLRPC pyzmq_copy pyzmq_nocopy +1 2.15 0.186 2.07 0.111 0.136 +10 2.49 0.187 1.87 0.115 0.137 +100 2.5 0.189 1.9 0.126 0.138 +1000 2.54 0.196 1.91 0.129 0.141 +10000 2.91 0.271 2.77 0.204 0.197 +100000 6.65 1.44 9.17 0.961 0.546 +1000000 50.2 15.8 81.5 8.39 2.25 +10000000 491 159 816 91.7 25.2 +100000000 5010 1560 8300 893 248 + +""" +with open('latency.csv','w') as f: + f.writelines(rawdata) + +data = csv2rec('latency.csv',delimiter='\t') + +loglog(data.bytes, data.xmlrpc*1000, label='XMLRPC') +loglog(data.bytes, data.jsonrpc*1000, label='JSONRPC') +loglog(data.bytes, data.pyro*1000, label='Pyro') +loglog(data.bytes, data.pyzmq_nocopy*1000, label='PyZMQ') +loglog(data.bytes, len(data.bytes)*[60], label='Ping') +legend(loc=2) +title('Latency') +xlabel('Number of bytes') +ylabel('Round trip latency ($\mu s$)') +grid(True) +show() +savefig('latency.png') + +clf() + +semilogx(data.bytes, 1000/data.xmlrpc, label='XMLRPC') +semilogx(data.bytes, 1000/data.jsonrpc, label='JSONRPC') +semilogx(data.bytes, 1000/data.pyro, label='Pyro') +semilogx(data.bytes, 1000/data.pyzmq_nocopy, label='PyZMQ') +legend(loc=1) +xlabel('Number of bytes') +ylabel('Message/s') +title('Message Throughput') +grid(True) +show() +savefig('msgs_sec.png') + +clf() + +loglog(data.bytes, 1000/data.xmlrpc, label='XMLRPC') +loglog(data.bytes, 1000/data.jsonrpc, label='JSONRPC') +loglog(data.bytes, 1000/data.pyro, label='Pyro') +loglog(data.bytes, 1000/data.pyzmq_nocopy, label='PyZMQ') +legend(loc=3) +xlabel('Number of bytes') +ylabel('Message/s') +title('Message Throughput') +grid(True) +show() +savefig('msgs_sec_log.png') + +clf() + +semilogx(data.bytes, data.pyro/data.pyzmq_nocopy, label="No-copy") +semilogx(data.bytes, data.pyro/data.pyzmq_copy, label="Copy") +xlabel('Number of bytes') +ylabel('Ratio throughputs') +title('PyZMQ Throughput/Pyro Throughput') +grid(True) +legend(loc=2) +show() +savefig('msgs_sec_ratio.png') diff --git a/examples/bench/pyro_client.py b/examples/bench/pyro_client.py new file mode 100644 index 0000000..5e25feb --- /dev/null +++ b/examples/bench/pyro_client.py @@ -0,0 +1,3 @@ +import Pyro.core + +client = Pyro.core.getProxyForURI("PYROLOC://localhost:7766/echo") \ No newline at end of file diff --git a/examples/bench/pyro_server.py b/examples/bench/pyro_server.py new file mode 100644 index 0000000..a2a2446 --- /dev/null +++ b/examples/bench/pyro_server.py @@ -0,0 +1,14 @@ +import Pyro.core + +class Echo(Pyro.core.ObjBase): + def __init__(self): + Pyro.core.ObjBase.__init__(self) + def echo(self, x): + return x + +Pyro.core.initServer() +daemon=Pyro.core.Daemon() +uri=daemon.connect(Echo(),"echo") + +daemon.requestLoop() + \ No newline at end of file diff --git a/examples/bench/pyzmq_client.py b/examples/bench/pyzmq_client.py new file mode 100644 index 0000000..9afccec --- /dev/null +++ b/examples/bench/pyzmq_client.py @@ -0,0 +1,16 @@ +import zmq + +c = zmq.Context() +s = c.socket(zmq.REQ) +s.connect('tcp://127.0.0.1:10001') + +def echo(msg): + s.send(msg, copy=False) + msg2 = s.recv(copy=False) + return msg2 + +class Client(object): + pass + +client = Client() +client.echo = echo diff --git a/examples/bench/pyzmq_server.py b/examples/bench/pyzmq_server.py new file mode 100644 index 0000000..cab0082 --- /dev/null +++ b/examples/bench/pyzmq_server.py @@ -0,0 +1,10 @@ +import zmq + +c = zmq.Context() +s = c.socket(zmq.REP) +s.bind('tcp://127.0.0.1:10001') + +while True: + msg = s.recv(copy=False) + s.send(msg) + diff --git a/examples/bench/xmlrpc_client.py b/examples/bench/xmlrpc_client.py new file mode 100644 index 0000000..a73ddfd --- /dev/null +++ b/examples/bench/xmlrpc_client.py @@ -0,0 +1,6 @@ +from timeit import default_timer as timer +from xmlrpclib import ServerProxy + +client = ServerProxy('http://localhost:10002') + + \ No newline at end of file diff --git a/examples/bench/xmlrpc_server.py b/examples/bench/xmlrpc_server.py new file mode 100644 index 0000000..24ab019 --- /dev/null +++ b/examples/bench/xmlrpc_server.py @@ -0,0 +1,8 @@ +from SimpleXMLRPCServer import SimpleXMLRPCServer + +def echo(x): + return x + +server = SimpleXMLRPCServer(('localhost',10002)) +server.register_function(echo) +server.serve_forever() \ No newline at end of file diff --git a/examples/chat/display.py b/examples/chat/display.py new file mode 100644 index 0000000..d4e240a --- /dev/null +++ b/examples/chat/display.py @@ -0,0 +1,41 @@ +"""The display part of a simply two process chat app.""" + +# +# Copyright (c) 2010 Andrew Gwozdziewycz +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . + +import zmq + +def main(addrs): + + context = zmq.Context() + socket = context.socket(zmq.SUB) + socket.setsockopt(zmq.SUBSCRIBE, "") + for addr in addrs: + print "Connecting to: ", addr + socket.connect(addr) + + while True: + msg = socket.recv_pyobj() + print "%s: %s" % (msg[1], msg[0]) + +if __name__ == '__main__': + import sys + if len(sys.argv) < 2: + print "usage: display.py
[,
...]" + raise SystemExit + main(sys.argv[1:]) diff --git a/examples/chat/prompt.py b/examples/chat/prompt.py new file mode 100644 index 0000000..d9b12ec --- /dev/null +++ b/examples/chat/prompt.py @@ -0,0 +1,39 @@ +"""The prompt part of a simply two process chat app.""" + +# +# Copyright (c) 2010 Andrew Gwozdziewycz +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . + +import zmq + +def main(addr, who): + + ctx = zmq.Context() + socket = ctx.socket(zmq.PUB) + socket.bind(addr) + + while True: + msg = raw_input("%s> " % who) + socket.send_pyobj((msg, who)) + + +if __name__ == '__main__': + import sys + if len(sys.argv) != 3: + print "usage: prompt.py
" + raise SystemExit + main(sys.argv[1], sys.argv[2]) diff --git a/examples/device/client.py b/examples/device/client.py new file mode 100644 index 0000000..14a4e26 --- /dev/null +++ b/examples/device/client.py @@ -0,0 +1,38 @@ +"""A client for the device based server.""" + +# +# Copyright (c) 2010 Brian E. Granger and Eugene Chernyshov +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . + +import zmq +import os +from time import time + +print 'Client', os.getpid() + +context = zmq.Context(1) + +socket = context.socket(zmq.REQ) +socket.connect('tcp://127.0.0.1:5555') + +while True: + data = zmq.Message(str(os.getpid())) + start = time() + socket.send(data) + data = socket.recv() + print time()-start, data + diff --git a/examples/device/server.py b/examples/device/server.py new file mode 100644 index 0000000..b29a3c1 --- /dev/null +++ b/examples/device/server.py @@ -0,0 +1,52 @@ +"""A device based server.""" + +# +# Copyright (c) 2010 Brian E. Granger and Eugene Chernyshov +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . + +import zmq +import os +import threading +import time + +print 'Server', os.getpid() + +def routine(context): + socket = context.socket(zmq.REP) + + socket.connect("inproc://workers") + + while True: + message = socket.recv() + time.sleep(1) + socket.send(message) + +context = zmq.Context(1) + +workers = context.socket(zmq.DEALER) +workers.bind("inproc://workers"); + +clients = context.socket(zmq.DEALER) +clients.bind('tcp://127.0.0.1:5555') + +for i in range(10): + thread = threading.Thread(target=routine, args=(context, )) + thread.start() + +zmq.device(zmq.QUEUE, clients, workers) + +print "Finished" diff --git a/examples/eventloop/asyncweb.py b/examples/eventloop/asyncweb.py new file mode 100644 index 0000000..06b03f3 --- /dev/null +++ b/examples/eventloop/asyncweb.py @@ -0,0 +1,96 @@ +"""Async web request example with tornado. + +Requests to localhost:8888 will be relayed via 0MQ to a slow responder, +who will take 1-5 seconds to respond. The tornado app will remain responsive +duriung this time, and when the worker replies, the web request will finish. + +A '.' is printed every 100ms to demonstrate that the zmq request is not blocking +the event loop. +""" + + +import sys +import random +import threading +import time + +import zmq +from zmq.eventloop import ioloop, zmqstream + +""" +ioloop.install() must be called prior to instantiating *any* tornado objects, +and ideally before importing anything from tornado, just to be safe. + +install() sets the singleton instance of tornado.ioloop.IOLoop with zmq's +IOLoop. If this is not done properly, multiple IOLoop instances may be +created, which will have the effect of some subset of handlers never being +called, because only one loop will be running. +""" + +ioloop.install() + +import tornado +from tornado import web + + +def slow_responder(): + """thread for slowly responding to replies.""" + ctx = zmq.Context.instance() + socket = ctx.socket(zmq.REP) + socket.linger = 0 + socket.bind('tcp://127.0.0.1:5555') + i=0 + while True: + msg = socket.recv() + print "\nworker received %r\n" % msg + time.sleep(random.randint(1,5)) + socket.send(msg + " to you too, #%i" % i) + i+=1 + +def dot(): + """callback for showing that IOLoop is still responsive while we wait""" + sys.stdout.write('.') + sys.stdout.flush() + +def printer(msg): + print (msg) + +class TestHandler(web.RequestHandler): + + @web.asynchronous + def get(self): + ctx = zmq.Context.instance() + s = ctx.socket(zmq.REQ) + s.connect('tcp://127.0.0.1:5555') + # send request to worker + s.send('hello') + loop = ioloop.IOLoop.instance() + self.stream = zmqstream.ZMQStream(s) + self.stream.on_recv(self.handle_reply) + + def handle_reply(self, msg): + # finish web request with worker's reply + reply = msg[0] + print "\nfinishing with %r\n" % reply, + self.stream.close() + self.write(reply) + self.finish() + +def main(): + worker = threading.Thread(target=slow_responder) + worker.daemon=True + worker.start() + + application = web.Application([(r"/", TestHandler)]) + beat = ioloop.PeriodicCallback(dot, 100) + beat.start() + application.listen(8888) + try: + ioloop.IOLoop.instance().start() + except KeyboardInterrupt: + print ' Interrupted' + + +if __name__ == "__main__": + main() + diff --git a/examples/eventloop/echo.py b/examples/eventloop/echo.py new file mode 100644 index 0000000..9be079b --- /dev/null +++ b/examples/eventloop/echo.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python +"""A trivial ZMQ echo server using the eventloop. + +Authors +------- +* MinRK +""" + +import zmq +from zmq.eventloop import ioloop + +loop = ioloop.IOLoop.instance() + +ctx = zmq.Context() +s = ctx.socket(zmq.REP) +s.bind('tcp://127.0.0.1:5555') + +def rep_handler(sock, events): + # We don't know how many recv's we can do? + msg = sock.recv() + # No guarantee that we can do the send. We need a way of putting the + # send in the event loop. + sock.send(msg) + +loop.add_handler(s, rep_handler, zmq.POLLIN) + +loop.start() \ No newline at end of file diff --git a/examples/eventloop/echostream.py b/examples/eventloop/echostream.py new file mode 100644 index 0000000..04c1532 --- /dev/null +++ b/examples/eventloop/echostream.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +"""Adapted echo.py to put the send in the event loop using a ZMQStream. + +Authors +------- +* MinRK +""" + +import zmq +from zmq.eventloop import ioloop, zmqstream +loop = ioloop.IOLoop.instance() + +ctx = zmq.Context() +s = ctx.socket(zmq.REP) +s.bind('tcp://127.0.0.1:5555') +stream = zmqstream.ZMQStream(s, loop) + +def echo(msg): + print " ".join(msg) + stream.send_multipart(msg) + +stream.on_recv(echo) + +loop.start() \ No newline at end of file diff --git a/examples/eventloop/web.py b/examples/eventloop/web.py new file mode 100644 index 0000000..1285f95 --- /dev/null +++ b/examples/eventloop/web.py @@ -0,0 +1,46 @@ +import zmq +from zmq.eventloop import ioloop, zmqstream + +""" +ioloop.install() must be called prior to instantiating *any* tornado objects, +and ideally before importing anything from tornado, just to be safe. + +install() sets the singleton instance of tornado.ioloop.IOLoop with zmq's +IOLoop. If this is not done properly, multiple IOLoop instances may be +created, which will have the effect of some subset of handlers never being +called, because only one loop will be running. +""" + +ioloop.install() + +import tornado +import tornado.web + + +""" +this application can be used with echostream.py, start echostream.py, +start web.py, then every time you hit http://localhost:8888/, +echostream.py will print out 'hello' +""" + +def printer(msg): + print (msg) + +ctx = zmq.Context() +s = ctx.socket(zmq.REQ) +s.connect('tcp://127.0.0.1:5555') +stream = zmqstream.ZMQStream(s) +stream.on_recv(printer) + +class TestHandler(tornado.web.RequestHandler): + def get(self): + print ("sending hello") + stream.send("hello") + self.write("hello") +application = tornado.web.Application([(r"/", TestHandler)]) + +if __name__ == "__main__": + application.listen(8888) + ioloop.IOLoop.instance().start() + + diff --git a/examples/gevent/poll.py b/examples/gevent/poll.py new file mode 100644 index 0000000..1daf80a --- /dev/null +++ b/examples/gevent/poll.py @@ -0,0 +1,42 @@ +import gevent +from zmq import green as zmq + +# Connect to both receiving sockets and send 10 messages +def sender(): + + sender = context.socket(zmq.PUSH) + sender.connect('inproc://polltest1') + sender.connect('inproc://polltest2') + + for i in xrange(10): + sender.send('test %d' % i) + gevent.sleep(1) + + +# create zmq context, and bind to pull sockets +context = zmq.Context() +receiver1 = context.socket(zmq.PULL) +receiver1.bind('inproc://polltest1') +receiver2 = context.socket(zmq.PULL) +receiver2.bind('inproc://polltest2') + +gevent.spawn(sender) + +# Create poller and register both reciever sockets +poller = zmq.Poller() +poller.register(receiver1, zmq.POLLIN) +poller.register(receiver2, zmq.POLLIN) + +# Read 10 messages from both reciever sockets +msgcnt = 0 +while msgcnt < 10: + socks = dict(poller.poll()) + if receiver1 in socks and socks[receiver1] == zmq.POLLIN: + print "Message from receiver1: %s" % receiver1.recv() + msgcnt += 1 + + if receiver2 in socks and socks[receiver2] == zmq.POLLIN: + print "Message from receiver2: %s" % receiver2.recv() + msgcnt += 1 + +print "%d messages received" % msgcnt diff --git a/examples/gevent/reqrep.py b/examples/gevent/reqrep.py new file mode 100644 index 0000000..2a4f307 --- /dev/null +++ b/examples/gevent/reqrep.py @@ -0,0 +1,47 @@ +""" +Complex example which is a combination of the rr* examples from the zguide. +""" +from gevent import spawn +import zmq.green as zmq + +# server +context = zmq.Context() +socket = context.socket(zmq.REP) +socket.connect("tcp://localhost:5560") + +def serve(socket): + while True: + message = socket.recv() + print "Received request: ", message + socket.send("World") +server = spawn(serve, socket) + + +# client +context = zmq.Context() +socket = context.socket(zmq.REQ) +socket.connect("tcp://localhost:5559") + +# Do 10 requests, waiting each time for a response +def client(): + for request in range(1,10): + socket.send("Hello") + message = socket.recv() + print "Received reply ", request, "[", message, "]" + + +# broker +frontend = context.socket(zmq.ROUTER) +backend = context.socket(zmq.DEALER); +frontend.bind("tcp://*:5559") +backend.bind("tcp://*:5560") + +def proxy(socket_from, socket_to): + while True: + m = socket_from.recv_multipart() + socket_to.send_multipart(m) + +a = spawn(proxy, frontend, backend) +b = spawn(proxy, backend, frontend) + +spawn(client).join() diff --git a/examples/gevent/simple.py b/examples/gevent/simple.py new file mode 100644 index 0000000..ae065b3 --- /dev/null +++ b/examples/gevent/simple.py @@ -0,0 +1,37 @@ +from gevent import spawn, spawn_later +import zmq.green as zmq + +# server +print zmq.Context +ctx = zmq.Context() +sock = ctx.socket(zmq.PUSH) +sock.bind('ipc:///tmp/zmqtest') + +spawn(sock.send_pyobj, ('this', 'is', 'a', 'python', 'tuple')) +spawn_later(1, sock.send_pyobj, {'hi': 1234}) +spawn_later(2, sock.send_pyobj, ({'this': ['is a more complicated object', ':)']}, 42, 42, 42)) +spawn_later(3, sock.send_pyobj, 'foobar') +spawn_later(4, sock.send_pyobj, 'quit') + + +# client +ctx = zmq.Context() # create a new context to kick the wheels +sock = ctx.socket(zmq.PULL) +sock.connect('ipc:///tmp/zmqtest') + +def get_objs(sock): + while True: + o = sock.recv_pyobj() + print 'received python object:', o + if o == 'quit': + print 'exiting.' + break + +def print_every(s, t=None): + print s + if t: + spawn_later(t, print_every, s, t) + +print_every('printing every half second', 0.5) +spawn(get_objs, sock).join() + diff --git a/examples/heartbeat/heart.py b/examples/heartbeat/heart.py new file mode 100644 index 0000000..175370e --- /dev/null +++ b/examples/heartbeat/heart.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +"""This launches an echoing rep socket device, +and runs a blocking numpy action. The rep socket should +remain responsive to pings during this time. Use heartbeater.py to +ping this heart, and see the responsiveness. + +Authors +------- +* MinRK +""" + +import time +import numpy +import zmq +from zmq import devices + +ctx = zmq.Context() + +dev = devices.ThreadDevice(zmq.FORWARDER, zmq.SUB, zmq.DEALER) +dev.setsockopt_in(zmq.SUBSCRIBE, "") +dev.connect_in('tcp://127.0.0.1:5555') +dev.connect_out('tcp://127.0.0.1:5556') +dev.start() + +#wait for connections +time.sleep(1) + +A = numpy.random.random((2**11,2**11)) +print "starting blocking loop" +while True: + tic = time.time() + numpy.dot(A,A.transpose()) + print "blocked for %.3f s"%(time.time()-tic) + diff --git a/examples/heartbeat/heartbeater.py b/examples/heartbeat/heartbeater.py new file mode 100644 index 0000000..180828a --- /dev/null +++ b/examples/heartbeat/heartbeater.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +""" + +For use with heart.py + +A basic heartbeater using PUB and ROUTER sockets. pings are sent out on the PUB, and hearts +are tracked based on their DEALER identities. + +You can start many hearts with heart.py, and the heartbeater will monitor all of them, and notice when they stop responding. + +Authors +------- +* MinRK +""" + +import time +import zmq +from zmq.eventloop import ioloop, zmqstream + + +class HeartBeater(object): + """A basic HeartBeater class + pingstream: a PUB stream + pongstream: an ROUTER stream""" + + def __init__(self, loop, pingstream, pongstream, period=1000): + self.loop = loop + self.period = period + + self.pingstream = pingstream + self.pongstream = pongstream + self.pongstream.on_recv(self.handle_pong) + + self.hearts = set() + self.responses = set() + self.lifetime = 0 + self.tic = time.time() + + self.caller = ioloop.PeriodicCallback(self.beat, period, self.loop) + self.caller.start() + + def beat(self): + toc = time.time() + self.lifetime += toc-self.tic + self.tic = toc + print self.lifetime + # self.message = str(self.lifetime) + goodhearts = self.hearts.intersection(self.responses) + heartfailures = self.hearts.difference(goodhearts) + newhearts = self.responses.difference(goodhearts) + # print newhearts, goodhearts, heartfailures + map(self.handle_new_heart, newhearts) + map(self.handle_heart_failure, heartfailures) + self.responses = set() + print "%i beating hearts: %s"%(len(self.hearts),self.hearts) + self.pingstream.send(str(self.lifetime)) + + def handle_new_heart(self, heart): + print "yay, got new heart %s!"%heart + self.hearts.add(heart) + + def handle_heart_failure(self, heart): + print "Heart %s failed :("%heart + self.hearts.remove(heart) + + + def handle_pong(self, msg): + "if heart is beating" + if msg[1] == str(self.lifetime): + self.responses.add(msg[0]) + else: + print "got bad heartbeat (possibly old?): %s"%msg[1] + +# sub.setsockopt(zmq.SUBSCRIBE) + + +if __name__ == '__main__': + loop = ioloop.IOLoop() + context = zmq.Context() + pub = context.socket(zmq.PUB) + pub.bind('tcp://127.0.0.1:5555') + router = context.socket(zmq.ROUTER) + router.bind('tcp://127.0.0.1:5556') + + outstream = zmqstream.ZMQStream(pub, loop) + instream = zmqstream.ZMQStream(router, loop) + + hb = HeartBeater(loop, outstream, instream) + + loop.start() diff --git a/examples/heartbeat/ping.py b/examples/heartbeat/ping.py new file mode 100644 index 0000000..933a39a --- /dev/null +++ b/examples/heartbeat/ping.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +"""For use with pong.py + +This script simply pings a process started by pong.py or tspong.py, to +demonstrate that zmq remains responsive while Python blocks. + +Authors +------- +* MinRK +""" + +import time +import numpy +import zmq + +ctx = zmq.Context() + +req = ctx.socket(zmq.REQ) +req.connect('tcp://127.0.0.1:10111') + +#wait for connects +time.sleep(1) +n=0 +while True: + time.sleep(numpy.random.random()) + for i in range(4): + n+=1 + msg = 'ping %i'%n + tic = time.time() + req.send(msg) + resp = req.recv() + print "%s: %.2f ms" % (msg, 1000*(time.time()-tic)) + assert msg == resp + diff --git a/examples/heartbeat/pong.py b/examples/heartbeat/pong.py new file mode 100644 index 0000000..47efb3a --- /dev/null +++ b/examples/heartbeat/pong.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +"""This launches an echoing rep socket device using +zmq.devices.ThreadDevice, and runs a blocking numpy action. +The rep socket should remain responsive to pings during this time. + +Use ping.py to see how responsive it is. + +Authors +------- +* MinRK +""" + +import time +import numpy +import zmq +from zmq import devices + +ctx = zmq.Context() + +dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1) +dev.bind_in('tcp://127.0.0.1:10111') +dev.setsockopt_in(zmq.IDENTITY, "whoda") +dev.start() + +#wait for connections +time.sleep(1) + +A = numpy.random.random((2**11,2**12)) +print "starting blocking loop" +while True: + tic = time.time() + numpy.dot(A,A.transpose()) + print "blocked for %.3f s"%(time.time()-tic) + diff --git a/examples/logger/zmqlogger.py b/examples/logger/zmqlogger.py new file mode 100644 index 0000000..c55b51b --- /dev/null +++ b/examples/logger/zmqlogger.py @@ -0,0 +1,70 @@ +""" +Simple example of using zmq log handlers + +This starts a number of subprocesses with PUBHandlers that generate +log messages at a regular interval. The main process has a SUB socket, +which aggregates and logs all of the messages to the root logger. +""" + +import logging +from multiprocessing import Process +import os +import random +import sys +import time + +import zmq +from zmq.log.handlers import PUBHandler + +LOG_LEVELS = (logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR, logging.CRITICAL) + +def sub_logger(port, level=logging.DEBUG): + ctx = zmq.Context() + sub = ctx.socket(zmq.SUB) + sub.bind('tcp://127.0.0.1:%i' % port) + sub.setsockopt(zmq.SUBSCRIBE, "") + logging.basicConfig(level=level) + + while True: + level, message = sub.recv_multipart() + if message.endswith('\n'): + # trim trailing newline, which will get appended again + message = message[:-1] + log = getattr(logging, level.lower()) + log(message) + +def log_worker(port, interval=1, level=logging.DEBUG): + ctx = zmq.Context() + pub = ctx.socket(zmq.PUB) + pub.connect('tcp://127.0.0.1:%i' % port) + + logger = logging.getLogger(str(os.getpid())) + logger.setLevel(level) + handler = PUBHandler(pub) + logger.addHandler(handler) + print "starting logger at %i with level=%s" % (os.getpid(), level) + + while True: + level = random.choice(LOG_LEVELS) + logger.log(level, "Hello from %i!" % os.getpid()) + time.sleep(interval) + +if __name__ == '__main__': + if len(sys.argv) > 1: + n = int(sys.argv[1]) + else: + n = 2 + + port = 5555 + + # start the log generators + workers = [ Process(target=log_worker, args=(port,), kwargs=dict(level=random.choice(LOG_LEVELS))) for i in range(n) ] + [ w.start() for w in workers ] + + # start the log watcher + try: + sub_logger(port) + except KeyboardInterrupt: + pass + finally: + [ w.terminate() for w in workers ] diff --git a/examples/mongodb/client.py b/examples/mongodb/client.py new file mode 100644 index 0000000..839dce7 --- /dev/null +++ b/examples/mongodb/client.py @@ -0,0 +1,46 @@ +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Justin Riley +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import json +import zmq + +class MongoZMQClient(object): + """ + Client that connects with MongoZMQ server to add/fetch docs + """ + + def __init__(self, connect_addr='tcp://127.0.0.1:5000'): + self._context = zmq.Context() + self._socket = self._context.socket(zmq.DEALER) + self._socket.connect(connect_addr) + + def _send_recv_msg(self, msg): + self._socket.send_multipart(msg) + return self._socket.recv_multipart()[0] + + def get_doc(self, keys): + msg = ['get', json.dumps(keys)] + json_str = self._send_recv_msg(msg) + return json.loads(json_str) + + def add_doc(self, doc): + msg = ['add', json.dumps(doc)] + return self._send_recv_msg(msg) + +def main(): + client = MongoZMQClient() + for i in range(10): + doc = {'job': str(i)} + print "Adding doc", doc + print client.add_doc(doc) + for i in range(10): + query = {'job': str(i)} + print "Getting doc matching query:", query + print client.get_doc(query) + +if __name__ == "__main__": + main() diff --git a/examples/mongodb/controller.py b/examples/mongodb/controller.py new file mode 100644 index 0000000..e154f1c --- /dev/null +++ b/examples/mongodb/controller.py @@ -0,0 +1,91 @@ +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Justin Riley +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import sys +import zmq +import pymongo +import pymongo.json_util +import json + +class MongoZMQ(object): + """ + ZMQ server that adds/fetches documents (ie dictionaries) to a MongoDB. + + NOTE: mongod must be started before using this class + """ + + def __init__(self, db_name, table_name, bind_addr="tcp://127.0.0.1:5000"): + """ + bind_addr: address to bind zmq socket on + db_name: name of database to write to (created if doesnt exist) + table_name: name of mongodb 'table' in the db to write to (created if doesnt exist) + """ + self._bind_addr = bind_addr + self._db_name = db_name + self._table_name = table_name + self._conn = pymongo.Connection() + self._db = self._conn[self._db_name] + self._table = self._db[self._table_name] + + def _doc_to_json(self, doc): + return json.dumps(doc,default=pymongo.json_util.default) + + def add_document(self, doc): + """ + Inserts a document (dictionary) into mongo database table + """ + print 'adding docment %s' % (doc) + try: + self._table.insert(doc) + except Exception,e: + return 'Error: %s' % e + + def get_document_by_keys(self, keys): + """ + Attempts to return a single document from database table that matches + each key/value in keys dictionary. + """ + print 'attempting to retrieve document using keys: %s' % keys + try: + return self._table.find_one(keys) + except Exception,e: + return 'Error: %s' % e + + def start(self): + context = zmq.Context() + socket = context.socket(zmq.ROUTER) + socket.bind(self._bind_addr) + while True: + msg = socket.recv_multipart() + print "Received msg: ", msg + if len(msg) != 3: + error_msg = 'invalid message received: %s' % msg + print error_msg + reply = [msg[0], error_msg] + socket.send_multipart(reply) + continue + id = msg[0] + operation = msg[1] + contents = json.loads(msg[2]) + # always send back the id with ROUTER + reply = [id] + if operation == 'add': + self.add_document(contents) + reply.append("success") + elif operation == 'get': + doc = self.get_document_by_keys(contents) + json_doc = self._doc_to_json(doc) + reply.append(json_doc) + else: + print 'unknown request' + socket.send_multipart(reply) + +def main(): + MongoZMQ('ipcontroller','jobs').start() + +if __name__ == "__main__": + main() diff --git a/examples/monitoring/simple_monitor.py b/examples/monitoring/simple_monitor.py new file mode 100644 index 0000000..88cdd4c --- /dev/null +++ b/examples/monitoring/simple_monitor.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +"""Simple example demonstrating the use of the socket monitoring feature.""" + +# This file is part of pyzmq. +# +# Distributed under the terms of the New BSD License. The full +# license is in the file COPYING.BSD, distributed as part of this +# software. +from __future__ import print_function + +__author__ = 'Guido Goldstein' + +import json +import os +import struct +import sys +import threading +import time + +import zmq +from zmq.utils.monitor import recv_monitor_message + +line = lambda : print('-' * 40) + +def logger(monitor): + done = False + while monitor.poll(timeout=5000): + evt = recv_monitor_message(monitor) + print(json.dumps(evt, indent=1)) + if evt['event'] == zmq.EVENT_MONITOR_STOPPED: + break + print() + print("Logger done!") + monitor.close() + +print("libzmq-%s" % zmq.zmq_version()) +if zmq.zmq_version_info() < (4,0): + raise RuntimeError("monitoring in libzmq version < 4.0 is not supported") + +print("Event names:") +for name in dir(zmq): + if name.startswith('EVENT_'): + print("%21s : %4i" % (name, getattr(zmq, name))) + + +ctx = zmq.Context().instance() +rep = ctx.socket(zmq.REP) +req = ctx.socket(zmq.REQ) + +monitor = req.get_monitor_socket() + +t = threading.Thread(target=logger, args=(monitor,)) +t.start() + +line() +print("bind req") +req.bind("tcp://127.0.0.1:6666") +req.bind("tcp://127.0.0.1:6667") +time.sleep(1) + +line() +print("connect rep") +rep.connect("tcp://127.0.0.1:6667") +time.sleep(0.2) +rep.connect("tcp://127.0.0.1:6666") +time.sleep(1) + +line() +print("disconnect rep") +rep.disconnect("tcp://127.0.0.1:6667") +time.sleep(1) +rep.disconnect("tcp://127.0.0.1:6666") +time.sleep(1) + +line() +print("close rep") +rep.close() +time.sleep(1) + +line() +print("close req") +req.close() +time.sleep(1) + +line() +print("joining") +t.join() + +print("END") +ctx.term() diff --git a/examples/poll/pair.py b/examples/poll/pair.py new file mode 100644 index 0000000..81c8b3a --- /dev/null +++ b/examples/poll/pair.py @@ -0,0 +1,56 @@ +"""A thorough test of polling PAIR sockets.""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Brian Granger +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import time +import zmq + +print "Running polling tests for PAIR sockets..." + +addr = 'tcp://127.0.0.1:5555' +ctx = zmq.Context() +s1 = ctx.socket(zmq.PAIR) +s2 = ctx.socket(zmq.PAIR) + +s1.bind(addr) +s2.connect(addr) + +# Sleep to allow sockets to connect. +time.sleep(1.0) + +poller = zmq.Poller() +poller.register(s1, zmq.POLLIN|zmq.POLLOUT) +poller.register(s2, zmq.POLLIN|zmq.POLLOUT) + +# Now make sure that both are send ready. +socks = dict(poller.poll()) +assert socks[s1] == zmq.POLLOUT +assert socks[s2] == zmq.POLLOUT + +# Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN +s1.send('msg1') +s2.send('msg2') +time.sleep(1.0) +socks = dict(poller.poll()) +assert socks[s1] == zmq.POLLOUT|zmq.POLLIN +assert socks[s2] == zmq.POLLOUT|zmq.POLLIN + +# Make sure that both are in POLLOUT after recv. +s1.recv() +s2.recv() +socks = dict(poller.poll()) +assert socks[s1] == zmq.POLLOUT +assert socks[s2] == zmq.POLLOUT + +poller.unregister(s1) +poller.unregister(s2) + +# Wait for everything to finish. +time.sleep(1.0) + +print "Finished." \ No newline at end of file diff --git a/examples/poll/pubsub.py b/examples/poll/pubsub.py new file mode 100644 index 0000000..a590fa9 --- /dev/null +++ b/examples/poll/pubsub.py @@ -0,0 +1,57 @@ +"""A thorough test of polling PUB/SUB sockets.""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Brian Granger +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import time +import zmq + +print "Running polling tets for PUB/SUB sockets..." + +addr = 'tcp://127.0.0.1:5555' +ctx = zmq.Context() +s1 = ctx.socket(zmq.PUB) +s2 = ctx.socket(zmq.SUB) +s2.setsockopt(zmq.SUBSCRIBE, '') + +s1.bind(addr) +s2.connect(addr) + +# Sleep to allow sockets to connect. +time.sleep(1.0) + +poller = zmq.Poller() +poller.register(s1, zmq.POLLIN|zmq.POLLOUT) +poller.register(s2, zmq.POLLIN|zmq.POLLOUT) + +# Now make sure that both are send ready. +socks = dict(poller.poll()) +assert socks[s1] == zmq.POLLOUT +assert not socks.has_key(s2) + +# Make sure that s1 stays in POLLOUT after a send. +s1.send('msg1') +socks = dict(poller.poll()) +assert socks[s1] == zmq.POLLOUT + +# Make sure that s2 is POLLIN after waiting. +time.sleep(0.5) +socks = dict(poller.poll()) +assert socks[s2] == zmq.POLLIN + +# Make sure that s2 goes into 0 after recv. +s2.recv() +socks = dict(poller.poll()) +assert not socks.has_key(s2) + +poller.unregister(s1) +poller.unregister(s2) + +# Wait for everything to finish. +time.sleep(1.0) + +print "Finished." diff --git a/examples/poll/reqrep.py b/examples/poll/reqrep.py new file mode 100644 index 0000000..ef4436c --- /dev/null +++ b/examples/poll/reqrep.py @@ -0,0 +1,71 @@ +"""A thorough test of polling REQ/REP sockets.""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Brian Granger +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import time +import zmq + +print "Running polling tests for REQ/REP sockets..." + +addr = 'tcp://127.0.0.1:5555' +ctx = zmq.Context() +s1 = ctx.socket(zmq.REP) +s2 = ctx.socket(zmq.REQ) + +s1.bind(addr) +s2.connect(addr) + +# Sleep to allow sockets to connect. +time.sleep(1.0) + +poller = zmq.Poller() +poller.register(s1, zmq.POLLIN|zmq.POLLOUT) +poller.register(s2, zmq.POLLIN|zmq.POLLOUT) + +# Make sure that s1 is in state 0 and s2 is in POLLOUT +socks = dict(poller.poll()) +assert not socks.has_key(s1) +assert socks[s2] == zmq.POLLOUT + +# Make sure that s2 goes immediately into state 0 after send. +s2.send('msg1') +socks = dict(poller.poll()) +assert not socks.has_key(s2) + +# Make sure that s1 goes into POLLIN state after a time.sleep(). +time.sleep(0.5) +socks = dict(poller.poll()) +assert socks[s1] == zmq.POLLIN + +# Make sure that s1 goes into POLLOUT after recv. +s1.recv() +socks = dict(poller.poll()) +assert socks[s1] == zmq.POLLOUT + +# Make sure s1 goes into state 0 after send. +s1.send('msg2') +socks = dict(poller.poll()) +assert not socks.has_key(s1) + +# Wait and then see that s2 is in POLLIN. +time.sleep(0.5) +socks = dict(poller.poll()) +assert socks[s2] == zmq.POLLIN + +# Make sure that s2 is in POLLOUT after recv. +s2.recv() +socks = dict(poller.poll()) +assert socks[s2] == zmq.POLLOUT + +poller.unregister(s1) +poller.unregister(s2) + +# Wait for everything to finish. +time.sleep(1.0) + +print "Finished." diff --git a/examples/pubsub/publisher.py b/examples/pubsub/publisher.py new file mode 100644 index 0000000..a2ce6c9 --- /dev/null +++ b/examples/pubsub/publisher.py @@ -0,0 +1,57 @@ +"""A test that publishes NumPy arrays. + +Uses REQ/REP (on PUB/SUB socket + 1) to synchronize +""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Brian Granger +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import sys +import time + +import zmq +import numpy + +def sync(bind_to): + # use bind socket + 1 + sync_with = ':'.join(bind_to.split(':')[:-1] + + [str(int(bind_to.split(':')[-1]) + 1)]) + ctx = zmq.Context.instance() + s = ctx.socket(zmq.REP) + s.bind(sync_with) + print "Waiting for subscriber to connect..." + s.recv() + print " Done." + s.send('GO') + +def main(): + if len (sys.argv) != 4: + print 'usage: publisher ' + sys.exit (1) + + try: + bind_to = sys.argv[1] + array_size = int(sys.argv[2]) + array_count = int (sys.argv[3]) + except (ValueError, OverflowError), e: + print 'array-size and array-count must be integers' + sys.exit (1) + + ctx = zmq.Context() + s = ctx.socket(zmq.PUB) + s.bind(bind_to) + + sync(bind_to) + + print "Sending arrays..." + for i in range(array_count): + a = numpy.random.rand(array_size, array_size) + s.send_pyobj(a) + print " Done." + +if __name__ == "__main__": + main() diff --git a/examples/pubsub/subscriber.py b/examples/pubsub/subscriber.py new file mode 100644 index 0000000..b996ad8 --- /dev/null +++ b/examples/pubsub/subscriber.py @@ -0,0 +1,74 @@ +"""A test that subscribes to NumPy arrays. + +Uses REQ/REP (on PUB/SUB socket + 1) to synchronize +""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Brian Granger +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + + +import sys +import time + +import zmq +import numpy + +def sync(connect_to): + # use connect socket + 1 + sync_with = ':'.join(connect_to.split(':')[:-1] + + [str(int(connect_to.split(':')[-1]) + 1)] + ) + ctx = zmq.Context.instance() + s = ctx.socket(zmq.REQ) + s.connect(sync_with) + s.send('READY') + s.recv() + +def main(): + if len (sys.argv) != 3: + print 'usage: subscriber ' + sys.exit (1) + + try: + connect_to = sys.argv[1] + array_count = int (sys.argv[2]) + except (ValueError, OverflowError), e: + print 'array-count must be integers' + sys.exit (1) + + ctx = zmq.Context() + s = ctx.socket(zmq.SUB) + s.connect(connect_to) + s.setsockopt(zmq.SUBSCRIBE,'') + + sync(connect_to) + + start = time.clock() + + print "Receiving arrays..." + for i in range(array_count): + a = s.recv_pyobj() + print " Done." + + end = time.clock() + + elapsed = (end - start) * 1000000 + if elapsed == 0: + elapsed = 1 + throughput = (1000000.0 * float (array_count)) / float (elapsed) + message_size = a.nbytes + megabits = float (throughput * message_size * 8) / 1000000 + + print "message size: %.0f [B]" % (message_size, ) + print "array count: %.0f" % (array_count, ) + print "mean throughput: %.0f [msg/s]" % (throughput, ) + print "mean throughput: %.3f [Mb/s]" % (megabits, ) + + time.sleep(1.0) + +if __name__ == "__main__": + main() diff --git a/examples/pubsub/topics_pub.py b/examples/pubsub/topics_pub.py new file mode 100755 index 0000000..73b3d1c --- /dev/null +++ b/examples/pubsub/topics_pub.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +"""Simple example of publish/subscribe illustrating topics. + +Publisher and subscriber can be started in any order, though if publisher +starts first, any messages sent before subscriber starts are lost. More than +one subscriber can listen, and they can listen to different topics. + +Topic filtering is done simply on the start of the string, e.g. listening to +'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to +catch 'weather'. +""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Brian Granger +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import itertools +import sys +import time + +import zmq + +def main(): + if len (sys.argv) != 2: + print 'usage: publisher ' + sys.exit (1) + + bind_to = sys.argv[1] + + all_topics = ['sports.general','sports.football','sports.basketball', + 'stocks.general','stocks.GOOG','stocks.AAPL', + 'weather'] + + ctx = zmq.Context() + s = ctx.socket(zmq.PUB) + s.bind(bind_to) + + print "Starting broadcast on topics:" + print " %s" % all_topics + print "Hit Ctrl-C to stop broadcasting." + print "Waiting so subscriber sockets can connect..." + print + time.sleep(1.0) + + msg_counter = itertools.count() + try: + for topic in itertools.cycle(all_topics): + msg_body = str(msg_counter.next()) + print ' Topic: %s, msg:%s' % (topic, msg_body) + s.send_multipart([topic, msg_body]) + # short wait so we don't hog the cpu + time.sleep(0.1) + except KeyboardInterrupt: + pass + + print "Waiting for message queues to flush..." + time.sleep(0.5) + print "Done." + +if __name__ == "__main__": + main() diff --git a/examples/pubsub/topics_sub.py b/examples/pubsub/topics_sub.py new file mode 100755 index 0000000..4a61fb5 --- /dev/null +++ b/examples/pubsub/topics_sub.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +"""Simple example of publish/subscribe illustrating topics. + +Publisher and subscriber can be started in any order, though if publisher +starts first, any messages sent before subscriber starts are lost. More than +one subscriber can listen, and they can listen to different topics. + +Topic filtering is done simply on the start of the string, e.g. listening to +'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to +catch 'weather'. +""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010 Brian Granger, Fernando Perez +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +import sys +import time + +import zmq +import numpy + +def main(): + if len (sys.argv) < 2: + print 'usage: subscriber [topic topic ...]' + sys.exit (1) + + connect_to = sys.argv[1] + topics = sys.argv[2:] + + ctx = zmq.Context() + s = ctx.socket(zmq.SUB) + s.connect(connect_to) + + # manage subscriptions + if not topics: + print "Receiving messages on ALL topics..." + s.setsockopt(zmq.SUBSCRIBE,'') + else: + print "Receiving messages on topics: %s ..." % topics + for t in topics: + s.setsockopt(zmq.SUBSCRIBE,t) + print + try: + while True: + topic, msg = s.recv_multipart() + print ' Topic: %s, msg:%s' % (topic, msg) + except KeyboardInterrupt: + pass + print "Done." + +if __name__ == "__main__": + main() diff --git a/examples/security/generate_certificates.py b/examples/security/generate_certificates.py new file mode 100644 index 0000000..80db258 --- /dev/null +++ b/examples/security/generate_certificates.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python + +""" +Generate client and server CURVE certificate files then move them into the +appropriate store directory, private_keys or public_keys. The certificates +generated by this script are used by the stonehouse and ironhouse examples. + +In practice this would be done by hand or some out-of-band process. + +Author: Chris Laws +""" + +import os +import shutil +import zmq.auth + +def generate_certificates(base_dir): + ''' Generate client and server CURVE certificate files''' + keys_dir = os.path.join(base_dir, 'certificates') + public_keys_dir = os.path.join(base_dir, 'public_keys') + secret_keys_dir = os.path.join(base_dir, 'private_keys') + + # Create directories for certificates, remove old content if necessary + for d in [keys_dir, public_keys_dir, secret_keys_dir]: + if os.path.exists(d): + shutil.rmtree(d) + os.mkdir(d) + + # create new keys in certificates dir + server_public_file, server_secret_file = zmq.auth.create_certificates(keys_dir, "server") + client_public_file, client_secret_file = zmq.auth.create_certificates(keys_dir, "client") + + # move public keys to appropriate directory + for key_file in os.listdir(keys_dir): + if key_file.endswith(".key"): + shutil.move(os.path.join(keys_dir, key_file), + os.path.join(public_keys_dir, '.')) + + # move secret keys to appropriate directory + for key_file in os.listdir(keys_dir): + if key_file.endswith(".key_secret"): + shutil.move(os.path.join(keys_dir, key_file), + os.path.join(secret_keys_dir, '.')) + +if __name__ == '__main__': + if zmq.zmq_version_info() < (4,0): + raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) + + generate_certificates(os.path.dirname(__file__)) diff --git a/examples/security/grasslands.py b/examples/security/grasslands.py new file mode 100644 index 0000000..cbd3ab9 --- /dev/null +++ b/examples/security/grasslands.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python + +''' +No protection at all. + +All connections are accepted, there is no authentication, and no privacy. + +This is how ZeroMQ always worked until we built security into the wire +protocol in early 2013. Internally, it uses a security mechanism called +"NULL". + +Author: Chris Laws +''' + +import zmq + + +ctx = zmq.Context().instance() + +server = ctx.socket(zmq.PUSH) +server.bind('tcp://*:9000') + +client = ctx.socket(zmq.PULL) +client.connect('tcp://127.0.0.1:9000') + +server.send(b"Hello") +msg = client.recv() +if msg == b"Hello": + print("Grasslands test OK") diff --git a/examples/security/ioloop-ironhouse.py b/examples/security/ioloop-ironhouse.py new file mode 100644 index 0000000..11fd400 --- /dev/null +++ b/examples/security/ioloop-ironhouse.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python + +''' +Ironhouse extends Stonehouse with client public key authentication. + +This is the strongest security model we have today, protecting against every +attack we know about, except end-point attacks (where an attacker plants +spyware on a machine to capture data before it's encrypted, or after it's +decrypted). + +This example demonstrates using the IOLoopAuthenticator. + +Author: Chris Laws +''' + +import logging +import os +import sys + +import zmq +import zmq.auth +from zmq.auth.ioloop import IOLoopAuthenticator +from zmq.eventloop import ioloop, zmqstream + +def echo(server, msg): + logging.debug("server recvd %s", msg) + reply = msg + [b'World'] + logging.debug("server sending %s", reply) + server.send_multipart(reply) + +def setup_server(server_secret_file, endpoint='tcp://127.0.0.1:9000'): + """setup a simple echo server with CURVE auth""" + server = zmq.Context.instance().socket(zmq.ROUTER) + + server_public, server_secret = zmq.auth.load_certificate(server_secret_file) + server.curve_secretkey = server_secret + server.curve_publickey = server_public + server.curve_server = True # must come before bind + server.bind(endpoint) + + server_stream = zmqstream.ZMQStream(server) + # simple echo + server_stream.on_recv_stream(echo) + return server_stream + +def client_msg_recvd(msg): + logging.debug("client recvd %s", msg) + logging.info("Ironhouse test OK") + # stop the loop when we get the reply + ioloop.IOLoop.instance().stop() + +def setup_client(client_secret_file, server_public_file, endpoint='tcp://127.0.0.1:9000'): + """setup a simple client with CURVE auth""" + + client = zmq.Context.instance().socket(zmq.DEALER) + + # We need two certificates, one for the client and one for + # the server. The client must know the server's public key + # to make a CURVE connection. + client_public, client_secret = zmq.auth.load_certificate(client_secret_file) + client.curve_secretkey = client_secret + client.curve_publickey = client_public + + server_public, _ = zmq.auth.load_certificate(server_public_file) + # The client must know the server's public key to make a CURVE connection. + client.curve_serverkey = server_public + client.connect(endpoint) + + client_stream = zmqstream.ZMQStream(client) + client_stream.on_recv(client_msg_recvd) + return client_stream + + +def run(): + '''Run Ironhouse example''' + + # These direcotries are generated by the generate_certificates script + base_dir = os.path.dirname(__file__) + keys_dir = os.path.join(base_dir, 'certificates') + public_keys_dir = os.path.join(base_dir, 'public_keys') + secret_keys_dir = os.path.join(base_dir, 'private_keys') + + if not (os.path.exists(keys_dir) and os.path.exists(keys_dir) and os.path.exists(keys_dir)): + logging.critical("Certificates are missing - run generate_certificates script first") + sys.exit(1) + + # Start an authenticator for this context. + auth = IOLoopAuthenticator() + auth.allow('127.0.0.1') + # Tell authenticator to use the certificate in a directory + auth.configure_curve(domain='*', location=public_keys_dir) + + server_secret_file = os.path.join(secret_keys_dir, "server.key_secret") + server = setup_server(server_secret_file) + server_public_file = os.path.join(public_keys_dir, "server.key") + client_secret_file = os.path.join(secret_keys_dir, "client.key_secret") + client = setup_client(client_secret_file, server_public_file) + client.send(b'Hello') + + auth.start() + ioloop.IOLoop.instance().start() + +if __name__ == '__main__': + if zmq.zmq_version_info() < (4,0): + raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) + + if '-v' in sys.argv: + level = logging.DEBUG + else: + level = logging.INFO + + logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") + + run() diff --git a/examples/security/ironhouse.py b/examples/security/ironhouse.py new file mode 100644 index 0000000..5d9faf1 --- /dev/null +++ b/examples/security/ironhouse.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python + +''' +Ironhouse extends Stonehouse with client public key authentication. + +This is the strongest security model we have today, protecting against every +attack we know about, except end-point attacks (where an attacker plants +spyware on a machine to capture data before it's encrypted, or after it's +decrypted). + +Author: Chris Laws +''' + +import logging +import os +import sys + +import zmq +import zmq.auth +from zmq.auth.thread import ThreadAuthenticator + + +def run(): + ''' Run Ironhouse example ''' + + # These direcotries are generated by the generate_certificates script + base_dir = os.path.dirname(__file__) + keys_dir = os.path.join(base_dir, 'certificates') + public_keys_dir = os.path.join(base_dir, 'public_keys') + secret_keys_dir = os.path.join(base_dir, 'private_keys') + + if not (os.path.exists(keys_dir) and os.path.exists(keys_dir) and os.path.exists(keys_dir)): + logging.critical("Certificates are missing - run generate_certificates.py script first") + sys.exit(1) + + ctx = zmq.Context().instance() + + # Start an authenticator for this context. + auth = ThreadAuthenticator(ctx) + auth.start() + auth.allow('127.0.0.1') + # Tell authenticator to use the certificate in a directory + auth.configure_curve(domain='*', location=public_keys_dir) + + server = ctx.socket(zmq.PUSH) + + server_secret_file = os.path.join(secret_keys_dir, "server.key_secret") + server_public, server_secret = zmq.auth.load_certificate(server_secret_file) + server.curve_secretkey = server_secret + server.curve_publickey = server_public + server.curve_server = True # must come before bind + server.bind('tcp://*:9000') + + client = ctx.socket(zmq.PULL) + + # We need two certificates, one for the client and one for + # the server. The client must know the server's public key + # to make a CURVE connection. + client_secret_file = os.path.join(secret_keys_dir, "client.key_secret") + client_public, client_secret = zmq.auth.load_certificate(client_secret_file) + client.curve_secretkey = client_secret + client.curve_publickey = client_public + + server_public_file = os.path.join(public_keys_dir, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) + # The client must know the server's public key to make a CURVE connection. + client.curve_serverkey = server_public + client.connect('tcp://127.0.0.1:9000') + + server.send(b"Hello") + + if client.poll(1000): + msg = client.recv() + if msg == b"Hello": + logging.info("Ironhouse test OK") + else: + logging.error("Ironhouse test FAIL") + + # stop auth thread + auth.stop() + +if __name__ == '__main__': + if zmq.zmq_version_info() < (4,0): + raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) + + if '-v' in sys.argv: + level = logging.DEBUG + else: + level = logging.INFO + + logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") + + run() diff --git a/examples/security/stonehouse.py b/examples/security/stonehouse.py new file mode 100644 index 0000000..276e87a --- /dev/null +++ b/examples/security/stonehouse.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python + +''' +Stonehouse uses the "CURVE" security mechanism. + +This gives us strong encryption on data, and (as far as we know) unbreakable +authentication. Stonehouse is the minimum you would use over public networks, +and assures clients that they are speaking to an authentic server, while +allowing any client to connect. + +Author: Chris Laws +''' + +import logging +import os +import sys +import time + +import zmq +import zmq.auth +from zmq.auth.thread import ThreadAuthenticator + + +def run(): + ''' Run Stonehouse example ''' + + # These directories are generated by the generate_certificates script + base_dir = os.path.dirname(__file__) + keys_dir = os.path.join(base_dir, 'certificates') + public_keys_dir = os.path.join(base_dir, 'public_keys') + secret_keys_dir = os.path.join(base_dir, 'private_keys') + + if not (os.path.exists(keys_dir) and os.path.exists(keys_dir) and os.path.exists(keys_dir)): + logging.critical("Certificates are missing: run generate_certificates.py script first") + sys.exit(1) + + ctx = zmq.Context().instance() + + # Start an authenticator for this context. + auth = ThreadAuthenticator(ctx) + auth.start() + auth.allow('127.0.0.1') + # Tell the authenticator how to handle CURVE requests + auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY) + + server = ctx.socket(zmq.PUSH) + server_secret_file = os.path.join(secret_keys_dir, "server.key_secret") + server_public, server_secret = zmq.auth.load_certificate(server_secret_file) + server.curve_secretkey = server_secret + server.curve_publickey = server_public + server.curve_server = True # must come before bind + server.bind('tcp://*:9000') + + client = ctx.socket(zmq.PULL) + # We need two certificates, one for the client and one for + # the server. The client must know the server's public key + # to make a CURVE connection. + client_secret_file = os.path.join(secret_keys_dir, "client.key_secret") + client_public, client_secret = zmq.auth.load_certificate(client_secret_file) + client.curve_secretkey = client_secret + client.curve_publickey = client_public + + # The client must know the server's public key to make a CURVE connection. + server_public_file = os.path.join(public_keys_dir, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) + client.curve_serverkey = server_public + + client.connect('tcp://127.0.0.1:9000') + + server.send(b"Hello") + + if client.poll(1000): + msg = client.recv() + if msg == b"Hello": + logging.info("Stonehouse test OK") + else: + logging.error("Stonehouse test FAIL") + + # stop auth thread + auth.stop() + +if __name__ == '__main__': + if zmq.zmq_version_info() < (4,0): + raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) + + if '-v' in sys.argv: + level = logging.DEBUG + else: + level = logging.INFO + + logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") + + run() diff --git a/examples/security/strawhouse.py b/examples/security/strawhouse.py new file mode 100644 index 0000000..dc75bd7 --- /dev/null +++ b/examples/security/strawhouse.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python + +''' +Allow or deny clients based on IP address. + +Strawhouse, which is plain text with filtering on IP addresses. It still +uses the NULL mechanism, but we install an authentication hook that checks +the IP address against a whitelist or blacklist and allows or denies it +accordingly. + +Author: Chris Laws +''' + +import logging +import sys + +import zmq +import zmq.auth +from zmq.auth.thread import ThreadAuthenticator + + +def run(): + '''Run strawhouse client''' + + allow_test_pass = False + deny_test_pass = False + + ctx = zmq.Context().instance() + + # Start an authenticator for this context. + auth = ThreadAuthenticator(ctx) + auth.start() + + # Part 1 - demonstrate allowing clients based on IP address + auth.allow('127.0.0.1') + + server = ctx.socket(zmq.PUSH) + server.zap_domain = b'global' # must come before bind + server.bind('tcp://*:9000') + + client_allow = ctx.socket(zmq.PULL) + client_allow.connect('tcp://127.0.0.1:9000') + + server.send(b"Hello") + + msg = client_allow.recv() + if msg == b"Hello": + allow_test_pass = True + + client_allow.close() + + # Part 2 - demonstrate denying clients based on IP address + auth.stop() + + auth = ThreadAuthenticator(ctx) + auth.start() + + auth.deny('127.0.0.1') + + client_deny = ctx.socket(zmq.PULL) + client_deny.connect('tcp://127.0.0.1:9000') + + if server.poll(50, zmq.POLLOUT): + server.send(b"Hello") + + if client_deny.poll(50): + msg = client_deny.recv() + else: + deny_test_pass = True + else: + deny_test_pass = True + + client_deny.close() + + auth.stop() # stop auth thread + + if allow_test_pass and deny_test_pass: + logging.info("Strawhouse test OK") + else: + logging.error("Strawhouse test FAIL") + + +if __name__ == '__main__': + if zmq.zmq_version_info() < (4,0): + raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) + + if '-v' in sys.argv: + level = logging.DEBUG + else: + level = logging.INFO + + logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") + + run() diff --git a/examples/security/woodhouse.py b/examples/security/woodhouse.py new file mode 100644 index 0000000..efedee4 --- /dev/null +++ b/examples/security/woodhouse.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python + +''' +Woodhouse extends Strawhouse with a name and password check. + +This uses the PLAIN mechanism which does plain-text username and password authentication). +It's not really secure, and anyone sniffing the network (trivial with WiFi) +can capture passwords and then login. + +Author: Chris Laws +''' + +import logging +import sys + +import zmq +import zmq.auth +from zmq.auth.thread import ThreadAuthenticator + +def run(): + '''Run woodhouse example''' + + valid_client_test_pass = False + invalid_client_test_pass = False + + ctx = zmq.Context().instance() + + # Start an authenticator for this context. + auth = ThreadAuthenticator(ctx) + auth.start() + auth.allow('127.0.0.1') + # Instruct authenticator to handle PLAIN requests + auth.configure_plain(domain='*', passwords={'admin': 'secret'}) + + server = ctx.socket(zmq.PUSH) + server.plain_server = True # must come before bind + server.bind('tcp://*:9000') + + client = ctx.socket(zmq.PULL) + client.plain_username = b'admin' + client.plain_password = b'secret' + client.connect('tcp://127.0.0.1:9000') + + server.send(b"Hello") + + if client.poll(): + msg = client.recv() + if msg == b"Hello": + valid_client_test_pass = True + + client.close() + + + # now use invalid credentials - expect no msg received + client2 = ctx.socket(zmq.PULL) + client2.plain_username = b'admin' + client2.plain_password = b'bogus' + client2.connect('tcp://127.0.0.1:9000') + + server.send(b"World") + + if client2.poll(50): + msg = client.recv() + if msg == "World": + invalid_client_test_pass = False + else: + # no message is expected + invalid_client_test_pass = True + + # stop auth thread + auth.stop() + + if valid_client_test_pass and invalid_client_test_pass: + logging.info("Woodhouse test OK") + else: + logging.error("Woodhouse test FAIL") + + +if __name__ == '__main__': + if zmq.zmq_version_info() < (4,0): + raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) + + if '-v' in sys.argv: + level = logging.DEBUG + else: + level = logging.INFO + + logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") + + run() diff --git a/examples/serialization/serialsocket.py b/examples/serialization/serialsocket.py new file mode 100644 index 0000000..7329bb9 --- /dev/null +++ b/examples/serialization/serialsocket.py @@ -0,0 +1,74 @@ +"""A Socket subclass that adds some serialization methods.""" + +import zlib +import pickle + +import numpy + +import zmq + +class SerializingSocket(zmq.Socket): + """A class with some extra serialization methods + + send_zipped_pickle is just like send_pyobj, but uses + zlib to compress the stream before sending. + + send_array sends numpy arrays with metadata necessary + for reconstructing the array on the other side (dtype,shape). + """ + + def send_zipped_pickle(self, obj, flags=0, protocol=-1): + """pack and compress an object with pickle and zlib.""" + pobj = pickle.dumps(obj, protocol) + zobj = zlib.compress(pobj) + print('zipped pickle is %i bytes' % len(zobj)) + return self.send(zobj, flags=flags) + + def recv_zipped_pickle(self, flags=0): + """reconstruct a Python object sent with zipped_pickle""" + zobj = self.recv(flags) + pobj = zlib.decompress(zobj) + return pickle.loads(pobj) + + def send_array(self, A, flags=0, copy=True, track=False): + """send a numpy array with metadata""" + md = dict( + dtype = str(A.dtype), + shape = A.shape, + ) + self.send_json(md, flags|zmq.SNDMORE) + return self.send(A, flags, copy=copy, track=track) + + def recv_array(self, flags=0, copy=True, track=False): + """recv a numpy array""" + md = self.recv_json(flags=flags) + msg = self.recv(flags=flags, copy=copy, track=track) + A = numpy.frombuffer(msg, dtype=md['dtype']) + return A.reshape(md['shape']) + +class SerializingContext(zmq.Context): + _socket_class = SerializingSocket + +def main(): + ctx = SerializingContext() + req = ctx.socket(zmq.REQ) + rep = ctx.socket(zmq.REP) + + rep.bind('inproc://a') + req.connect('inproc://a') + A = numpy.ones((1024,1024)) + print ("Array is %i bytes" % (len(A) * 8)) + + # send/recv with pickle+zip + req.send_zipped_pickle(A) + B = rep.recv_zipped_pickle() + # now try non-copying version + rep.send_array(A, copy=False) + C = req.recv_array(copy=False) + print ("Checking zipped pickle...") + print ("Okay" if (A==B).all() else "Failed") + print ("Checking send_array...") + print ("Okay" if (C==B).all() else "Failed") + +if __name__ == '__main__': + main() \ No newline at end of file -- cgit v1.2.3