From 44be832c5708baadd146cb954befbc3dcad8d463 Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Tue, 11 Nov 2014 11:52:45 -0500 Subject: prepare for upgrade to new upstream --- examples/LICENSE | 3 - examples/README_PY3K | 10 --- examples/bench/benchmark.py | 25 ------- examples/bench/jsonrpc_client.py | 4 - examples/bench/jsonrpc_server.py | 8 -- examples/bench/latency.png | Bin 58452 -> 0 bytes examples/bench/msgs_sec.png | Bin 56500 -> 0 bytes examples/bench/msgs_sec_log.png | Bin 59966 -> 0 bytes examples/bench/msgs_sec_ratio.png | Bin 39876 -> 0 bytes examples/bench/plot_latency.py | 84 --------------------- examples/bench/pyro_client.py | 3 - examples/bench/pyro_server.py | 14 ---- examples/bench/pyzmq_client.py | 16 ---- examples/bench/pyzmq_server.py | 10 --- examples/bench/xmlrpc_client.py | 6 -- examples/bench/xmlrpc_server.py | 8 -- examples/chat/display.py | 41 ----------- examples/chat/prompt.py | 39 ---------- examples/device/client.py | 38 ---------- examples/device/server.py | 52 ------------- examples/eventloop/asyncweb.py | 96 ------------------------ examples/eventloop/echo.py | 27 ------- examples/eventloop/echostream.py | 24 ------ examples/eventloop/web.py | 46 ------------ examples/gevent/poll.py | 42 ----------- examples/gevent/reqrep.py | 47 ------------ examples/gevent/simple.py | 37 ---------- examples/heartbeat/heart.py | 34 --------- examples/heartbeat/heartbeater.py | 90 ----------------------- examples/heartbeat/ping.py | 34 --------- examples/heartbeat/pong.py | 34 --------- examples/logger/zmqlogger.py | 70 ------------------ examples/mongodb/client.py | 46 ------------ examples/mongodb/controller.py | 91 ----------------------- examples/monitoring/simple_monitor.py | 90 ----------------------- examples/poll/pair.py | 56 -------------- examples/poll/pubsub.py | 57 --------------- examples/poll/reqrep.py | 71 ------------------ examples/pubsub/publisher.py | 57 --------------- examples/pubsub/subscriber.py | 74 ------------------- examples/pubsub/topics_pub.py | 64 ---------------- examples/pubsub/topics_sub.py | 56 -------------- examples/security/generate_certificates.py | 49 ------------- examples/security/grasslands.py | 29 -------- examples/security/ioloop-ironhouse.py | 114 ----------------------------- examples/security/ironhouse.py | 93 ----------------------- examples/security/stonehouse.py | 93 ----------------------- examples/security/strawhouse.py | 94 ------------------------ examples/security/woodhouse.py | 90 ----------------------- examples/serialization/serialsocket.py | 74 ------------------- 50 files changed, 2240 deletions(-) delete mode 100644 examples/LICENSE delete mode 100644 examples/README_PY3K delete mode 100644 examples/bench/benchmark.py delete mode 100644 examples/bench/jsonrpc_client.py delete mode 100644 examples/bench/jsonrpc_server.py delete mode 100644 examples/bench/latency.png delete mode 100644 examples/bench/msgs_sec.png delete mode 100644 examples/bench/msgs_sec_log.png delete mode 100644 examples/bench/msgs_sec_ratio.png delete mode 100644 examples/bench/plot_latency.py delete mode 100644 examples/bench/pyro_client.py delete mode 100644 examples/bench/pyro_server.py delete mode 100644 examples/bench/pyzmq_client.py delete mode 100644 examples/bench/pyzmq_server.py delete mode 100644 examples/bench/xmlrpc_client.py delete mode 100644 examples/bench/xmlrpc_server.py delete mode 100644 examples/chat/display.py delete mode 100644 examples/chat/prompt.py delete mode 100644 examples/device/client.py delete mode 100644 examples/device/server.py delete mode 100644 examples/eventloop/asyncweb.py delete mode 100644 examples/eventloop/echo.py delete mode 100644 examples/eventloop/echostream.py delete mode 100644 examples/eventloop/web.py delete mode 100644 examples/gevent/poll.py delete mode 100644 examples/gevent/reqrep.py delete mode 100644 examples/gevent/simple.py delete mode 100644 examples/heartbeat/heart.py delete mode 100644 examples/heartbeat/heartbeater.py delete mode 100644 examples/heartbeat/ping.py delete mode 100644 examples/heartbeat/pong.py delete mode 100644 examples/logger/zmqlogger.py delete mode 100644 examples/mongodb/client.py delete mode 100644 examples/mongodb/controller.py delete mode 100644 examples/monitoring/simple_monitor.py delete mode 100644 examples/poll/pair.py delete mode 100644 examples/poll/pubsub.py delete mode 100644 examples/poll/reqrep.py delete mode 100644 examples/pubsub/publisher.py delete mode 100644 examples/pubsub/subscriber.py delete mode 100755 examples/pubsub/topics_pub.py delete mode 100755 examples/pubsub/topics_sub.py delete mode 100644 examples/security/generate_certificates.py delete mode 100644 examples/security/grasslands.py delete mode 100644 examples/security/ioloop-ironhouse.py delete mode 100644 examples/security/ironhouse.py delete mode 100644 examples/security/stonehouse.py delete mode 100644 examples/security/strawhouse.py delete mode 100644 examples/security/woodhouse.py delete mode 100644 examples/serialization/serialsocket.py (limited to 'examples') diff --git a/examples/LICENSE b/examples/LICENSE deleted file mode 100644 index d4d3950..0000000 --- a/examples/LICENSE +++ /dev/null @@ -1,3 +0,0 @@ -PyZMQ examples are copyright their respective authors, and licensed -under the New BSD License as described in COPYING.BSD unless otherwise -specified in the file. \ No newline at end of file diff --git a/examples/README_PY3K b/examples/README_PY3K deleted file mode 100644 index d5272d0..0000000 --- a/examples/README_PY3K +++ /dev/null @@ -1,10 +0,0 @@ -These examples use Python2 syntax. Due to the change in Python from bytestring str objects -to unicode str objects, 2to3 does not perform an adequate transform of the code. Examples -can be valid on both Python2.5 and Python3, but such code is less readable than it should be. - -As a result, the Python3 examples are kept in a separate repo: - -https://github.com/minrk/pyzmq-py3k-examples - - -The differences are very small, but important. \ No newline at end of file diff --git a/examples/bench/benchmark.py b/examples/bench/benchmark.py deleted file mode 100644 index c379af9..0000000 --- a/examples/bench/benchmark.py +++ /dev/null @@ -1,25 +0,0 @@ -from timeit import default_timer as timer - -def benchmark(f, size, reps): - msg = size*'0' - t1 = timer() - for i in range(reps): - msg2 = f(msg) - assert msg == msg2 - t2 = timer() - diff = (t2-t1) - latency = diff/reps - return latency*1000000 - -kB = [1000*2**n for n in range(10)] -MB = [1000000*2**n for n in range(8)] -sizes = [1] + kB + MB - -def benchmark_set(f, sizes, reps): - latencies = [] - for size, rep in zip(sizes, reps): - print "Running benchmark with %r reps of %r bytes" % (rep, size) - lat = benchmark(f, size, rep) - latencies.append(lat) - return sizes, latencies - diff --git a/examples/bench/jsonrpc_client.py b/examples/bench/jsonrpc_client.py deleted file mode 100644 index 7fb6ef4..0000000 --- a/examples/bench/jsonrpc_client.py +++ /dev/null @@ -1,4 +0,0 @@ -from timeit import default_timer as timer -from jsonrpclib import Server - -client = Server('http://localhost:10000') diff --git a/examples/bench/jsonrpc_server.py b/examples/bench/jsonrpc_server.py deleted file mode 100644 index 4500a02..0000000 --- a/examples/bench/jsonrpc_server.py +++ /dev/null @@ -1,8 +0,0 @@ -from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer - -def echo(x): - return x - -server = SimpleJSONRPCServer(('localhost',10000)) -server.register_function(echo) -server.serve_forever() \ No newline at end of file diff --git a/examples/bench/latency.png b/examples/bench/latency.png deleted file mode 100644 index bb414b5..0000000 Binary files a/examples/bench/latency.png and /dev/null differ diff --git a/examples/bench/msgs_sec.png b/examples/bench/msgs_sec.png deleted file mode 100644 index a7b294b..0000000 Binary files a/examples/bench/msgs_sec.png and /dev/null differ diff --git a/examples/bench/msgs_sec_log.png b/examples/bench/msgs_sec_log.png deleted file mode 100644 index c3a361e..0000000 Binary files a/examples/bench/msgs_sec_log.png and /dev/null differ diff --git a/examples/bench/msgs_sec_ratio.png b/examples/bench/msgs_sec_ratio.png deleted file mode 100644 index 0a87331..0000000 Binary files a/examples/bench/msgs_sec_ratio.png and /dev/null differ diff --git a/examples/bench/plot_latency.py b/examples/bench/plot_latency.py deleted file mode 100644 index f50ef29..0000000 --- a/examples/bench/plot_latency.py +++ /dev/null @@ -1,84 +0,0 @@ -"""Plot latency data from messaging benchmarks. - -To generate the data for each library, I started the server and then did -the following for each client:: - - from xmlrpc_client import client - for i in range(9): - s = '0'*10**i - print s - %timeit client.echo(s) -""" - -from matplotlib.pylab import * - -rawdata = """# Data in milliseconds -Bytes JSONRPC PYRO XMLRPC pyzmq_copy pyzmq_nocopy -1 2.15 0.186 2.07 0.111 0.136 -10 2.49 0.187 1.87 0.115 0.137 -100 2.5 0.189 1.9 0.126 0.138 -1000 2.54 0.196 1.91 0.129 0.141 -10000 2.91 0.271 2.77 0.204 0.197 -100000 6.65 1.44 9.17 0.961 0.546 -1000000 50.2 15.8 81.5 8.39 2.25 -10000000 491 159 816 91.7 25.2 -100000000 5010 1560 8300 893 248 - -""" -with open('latency.csv','w') as f: - f.writelines(rawdata) - -data = csv2rec('latency.csv',delimiter='\t') - -loglog(data.bytes, data.xmlrpc*1000, label='XMLRPC') -loglog(data.bytes, data.jsonrpc*1000, label='JSONRPC') -loglog(data.bytes, data.pyro*1000, label='Pyro') -loglog(data.bytes, data.pyzmq_nocopy*1000, label='PyZMQ') -loglog(data.bytes, len(data.bytes)*[60], label='Ping') -legend(loc=2) -title('Latency') -xlabel('Number of bytes') -ylabel('Round trip latency ($\mu s$)') -grid(True) -show() -savefig('latency.png') - -clf() - -semilogx(data.bytes, 1000/data.xmlrpc, label='XMLRPC') -semilogx(data.bytes, 1000/data.jsonrpc, label='JSONRPC') -semilogx(data.bytes, 1000/data.pyro, label='Pyro') -semilogx(data.bytes, 1000/data.pyzmq_nocopy, label='PyZMQ') -legend(loc=1) -xlabel('Number of bytes') -ylabel('Message/s') -title('Message Throughput') -grid(True) -show() -savefig('msgs_sec.png') - -clf() - -loglog(data.bytes, 1000/data.xmlrpc, label='XMLRPC') -loglog(data.bytes, 1000/data.jsonrpc, label='JSONRPC') -loglog(data.bytes, 1000/data.pyro, label='Pyro') -loglog(data.bytes, 1000/data.pyzmq_nocopy, label='PyZMQ') -legend(loc=3) -xlabel('Number of bytes') -ylabel('Message/s') -title('Message Throughput') -grid(True) -show() -savefig('msgs_sec_log.png') - -clf() - -semilogx(data.bytes, data.pyro/data.pyzmq_nocopy, label="No-copy") -semilogx(data.bytes, data.pyro/data.pyzmq_copy, label="Copy") -xlabel('Number of bytes') -ylabel('Ratio throughputs') -title('PyZMQ Throughput/Pyro Throughput') -grid(True) -legend(loc=2) -show() -savefig('msgs_sec_ratio.png') diff --git a/examples/bench/pyro_client.py b/examples/bench/pyro_client.py deleted file mode 100644 index 5e25feb..0000000 --- a/examples/bench/pyro_client.py +++ /dev/null @@ -1,3 +0,0 @@ -import Pyro.core - -client = Pyro.core.getProxyForURI("PYROLOC://localhost:7766/echo") \ No newline at end of file diff --git a/examples/bench/pyro_server.py b/examples/bench/pyro_server.py deleted file mode 100644 index a2a2446..0000000 --- a/examples/bench/pyro_server.py +++ /dev/null @@ -1,14 +0,0 @@ -import Pyro.core - -class Echo(Pyro.core.ObjBase): - def __init__(self): - Pyro.core.ObjBase.__init__(self) - def echo(self, x): - return x - -Pyro.core.initServer() -daemon=Pyro.core.Daemon() -uri=daemon.connect(Echo(),"echo") - -daemon.requestLoop() - \ No newline at end of file diff --git a/examples/bench/pyzmq_client.py b/examples/bench/pyzmq_client.py deleted file mode 100644 index 9afccec..0000000 --- a/examples/bench/pyzmq_client.py +++ /dev/null @@ -1,16 +0,0 @@ -import zmq - -c = zmq.Context() -s = c.socket(zmq.REQ) -s.connect('tcp://127.0.0.1:10001') - -def echo(msg): - s.send(msg, copy=False) - msg2 = s.recv(copy=False) - return msg2 - -class Client(object): - pass - -client = Client() -client.echo = echo diff --git a/examples/bench/pyzmq_server.py b/examples/bench/pyzmq_server.py deleted file mode 100644 index cab0082..0000000 --- a/examples/bench/pyzmq_server.py +++ /dev/null @@ -1,10 +0,0 @@ -import zmq - -c = zmq.Context() -s = c.socket(zmq.REP) -s.bind('tcp://127.0.0.1:10001') - -while True: - msg = s.recv(copy=False) - s.send(msg) - diff --git a/examples/bench/xmlrpc_client.py b/examples/bench/xmlrpc_client.py deleted file mode 100644 index a73ddfd..0000000 --- a/examples/bench/xmlrpc_client.py +++ /dev/null @@ -1,6 +0,0 @@ -from timeit import default_timer as timer -from xmlrpclib import ServerProxy - -client = ServerProxy('http://localhost:10002') - - \ No newline at end of file diff --git a/examples/bench/xmlrpc_server.py b/examples/bench/xmlrpc_server.py deleted file mode 100644 index 24ab019..0000000 --- a/examples/bench/xmlrpc_server.py +++ /dev/null @@ -1,8 +0,0 @@ -from SimpleXMLRPCServer import SimpleXMLRPCServer - -def echo(x): - return x - -server = SimpleXMLRPCServer(('localhost',10002)) -server.register_function(echo) -server.serve_forever() \ No newline at end of file diff --git a/examples/chat/display.py b/examples/chat/display.py deleted file mode 100644 index d4e240a..0000000 --- a/examples/chat/display.py +++ /dev/null @@ -1,41 +0,0 @@ -"""The display part of a simply two process chat app.""" - -# -# Copyright (c) 2010 Andrew Gwozdziewycz -# -# This file is part of pyzmq. -# -# pyzmq is free software; you can redistribute it and/or modify it under -# the terms of the Lesser GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# pyzmq is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Lesser GNU General Public License for more details. -# -# You should have received a copy of the Lesser GNU General Public License -# along with this program. If not, see . - -import zmq - -def main(addrs): - - context = zmq.Context() - socket = context.socket(zmq.SUB) - socket.setsockopt(zmq.SUBSCRIBE, "") - for addr in addrs: - print "Connecting to: ", addr - socket.connect(addr) - - while True: - msg = socket.recv_pyobj() - print "%s: %s" % (msg[1], msg[0]) - -if __name__ == '__main__': - import sys - if len(sys.argv) < 2: - print "usage: display.py
[,
...]" - raise SystemExit - main(sys.argv[1:]) diff --git a/examples/chat/prompt.py b/examples/chat/prompt.py deleted file mode 100644 index d9b12ec..0000000 --- a/examples/chat/prompt.py +++ /dev/null @@ -1,39 +0,0 @@ -"""The prompt part of a simply two process chat app.""" - -# -# Copyright (c) 2010 Andrew Gwozdziewycz -# -# This file is part of pyzmq. -# -# pyzmq is free software; you can redistribute it and/or modify it under -# the terms of the Lesser GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# pyzmq is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Lesser GNU General Public License for more details. -# -# You should have received a copy of the Lesser GNU General Public License -# along with this program. If not, see . - -import zmq - -def main(addr, who): - - ctx = zmq.Context() - socket = ctx.socket(zmq.PUB) - socket.bind(addr) - - while True: - msg = raw_input("%s> " % who) - socket.send_pyobj((msg, who)) - - -if __name__ == '__main__': - import sys - if len(sys.argv) != 3: - print "usage: prompt.py
" - raise SystemExit - main(sys.argv[1], sys.argv[2]) diff --git a/examples/device/client.py b/examples/device/client.py deleted file mode 100644 index 14a4e26..0000000 --- a/examples/device/client.py +++ /dev/null @@ -1,38 +0,0 @@ -"""A client for the device based server.""" - -# -# Copyright (c) 2010 Brian E. Granger and Eugene Chernyshov -# -# This file is part of pyzmq. -# -# pyzmq is free software; you can redistribute it and/or modify it under -# the terms of the Lesser GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# pyzmq is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Lesser GNU General Public License for more details. -# -# You should have received a copy of the Lesser GNU General Public License -# along with this program. If not, see . - -import zmq -import os -from time import time - -print 'Client', os.getpid() - -context = zmq.Context(1) - -socket = context.socket(zmq.REQ) -socket.connect('tcp://127.0.0.1:5555') - -while True: - data = zmq.Message(str(os.getpid())) - start = time() - socket.send(data) - data = socket.recv() - print time()-start, data - diff --git a/examples/device/server.py b/examples/device/server.py deleted file mode 100644 index b29a3c1..0000000 --- a/examples/device/server.py +++ /dev/null @@ -1,52 +0,0 @@ -"""A device based server.""" - -# -# Copyright (c) 2010 Brian E. Granger and Eugene Chernyshov -# -# This file is part of pyzmq. -# -# pyzmq is free software; you can redistribute it and/or modify it under -# the terms of the Lesser GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# pyzmq is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Lesser GNU General Public License for more details. -# -# You should have received a copy of the Lesser GNU General Public License -# along with this program. If not, see . - -import zmq -import os -import threading -import time - -print 'Server', os.getpid() - -def routine(context): - socket = context.socket(zmq.REP) - - socket.connect("inproc://workers") - - while True: - message = socket.recv() - time.sleep(1) - socket.send(message) - -context = zmq.Context(1) - -workers = context.socket(zmq.DEALER) -workers.bind("inproc://workers"); - -clients = context.socket(zmq.DEALER) -clients.bind('tcp://127.0.0.1:5555') - -for i in range(10): - thread = threading.Thread(target=routine, args=(context, )) - thread.start() - -zmq.device(zmq.QUEUE, clients, workers) - -print "Finished" diff --git a/examples/eventloop/asyncweb.py b/examples/eventloop/asyncweb.py deleted file mode 100644 index 06b03f3..0000000 --- a/examples/eventloop/asyncweb.py +++ /dev/null @@ -1,96 +0,0 @@ -"""Async web request example with tornado. - -Requests to localhost:8888 will be relayed via 0MQ to a slow responder, -who will take 1-5 seconds to respond. The tornado app will remain responsive -duriung this time, and when the worker replies, the web request will finish. - -A '.' is printed every 100ms to demonstrate that the zmq request is not blocking -the event loop. -""" - - -import sys -import random -import threading -import time - -import zmq -from zmq.eventloop import ioloop, zmqstream - -""" -ioloop.install() must be called prior to instantiating *any* tornado objects, -and ideally before importing anything from tornado, just to be safe. - -install() sets the singleton instance of tornado.ioloop.IOLoop with zmq's -IOLoop. If this is not done properly, multiple IOLoop instances may be -created, which will have the effect of some subset of handlers never being -called, because only one loop will be running. -""" - -ioloop.install() - -import tornado -from tornado import web - - -def slow_responder(): - """thread for slowly responding to replies.""" - ctx = zmq.Context.instance() - socket = ctx.socket(zmq.REP) - socket.linger = 0 - socket.bind('tcp://127.0.0.1:5555') - i=0 - while True: - msg = socket.recv() - print "\nworker received %r\n" % msg - time.sleep(random.randint(1,5)) - socket.send(msg + " to you too, #%i" % i) - i+=1 - -def dot(): - """callback for showing that IOLoop is still responsive while we wait""" - sys.stdout.write('.') - sys.stdout.flush() - -def printer(msg): - print (msg) - -class TestHandler(web.RequestHandler): - - @web.asynchronous - def get(self): - ctx = zmq.Context.instance() - s = ctx.socket(zmq.REQ) - s.connect('tcp://127.0.0.1:5555') - # send request to worker - s.send('hello') - loop = ioloop.IOLoop.instance() - self.stream = zmqstream.ZMQStream(s) - self.stream.on_recv(self.handle_reply) - - def handle_reply(self, msg): - # finish web request with worker's reply - reply = msg[0] - print "\nfinishing with %r\n" % reply, - self.stream.close() - self.write(reply) - self.finish() - -def main(): - worker = threading.Thread(target=slow_responder) - worker.daemon=True - worker.start() - - application = web.Application([(r"/", TestHandler)]) - beat = ioloop.PeriodicCallback(dot, 100) - beat.start() - application.listen(8888) - try: - ioloop.IOLoop.instance().start() - except KeyboardInterrupt: - print ' Interrupted' - - -if __name__ == "__main__": - main() - diff --git a/examples/eventloop/echo.py b/examples/eventloop/echo.py deleted file mode 100644 index 9be079b..0000000 --- a/examples/eventloop/echo.py +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python -"""A trivial ZMQ echo server using the eventloop. - -Authors -------- -* MinRK -""" - -import zmq -from zmq.eventloop import ioloop - -loop = ioloop.IOLoop.instance() - -ctx = zmq.Context() -s = ctx.socket(zmq.REP) -s.bind('tcp://127.0.0.1:5555') - -def rep_handler(sock, events): - # We don't know how many recv's we can do? - msg = sock.recv() - # No guarantee that we can do the send. We need a way of putting the - # send in the event loop. - sock.send(msg) - -loop.add_handler(s, rep_handler, zmq.POLLIN) - -loop.start() \ No newline at end of file diff --git a/examples/eventloop/echostream.py b/examples/eventloop/echostream.py deleted file mode 100644 index 04c1532..0000000 --- a/examples/eventloop/echostream.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python -"""Adapted echo.py to put the send in the event loop using a ZMQStream. - -Authors -------- -* MinRK -""" - -import zmq -from zmq.eventloop import ioloop, zmqstream -loop = ioloop.IOLoop.instance() - -ctx = zmq.Context() -s = ctx.socket(zmq.REP) -s.bind('tcp://127.0.0.1:5555') -stream = zmqstream.ZMQStream(s, loop) - -def echo(msg): - print " ".join(msg) - stream.send_multipart(msg) - -stream.on_recv(echo) - -loop.start() \ No newline at end of file diff --git a/examples/eventloop/web.py b/examples/eventloop/web.py deleted file mode 100644 index 1285f95..0000000 --- a/examples/eventloop/web.py +++ /dev/null @@ -1,46 +0,0 @@ -import zmq -from zmq.eventloop import ioloop, zmqstream - -""" -ioloop.install() must be called prior to instantiating *any* tornado objects, -and ideally before importing anything from tornado, just to be safe. - -install() sets the singleton instance of tornado.ioloop.IOLoop with zmq's -IOLoop. If this is not done properly, multiple IOLoop instances may be -created, which will have the effect of some subset of handlers never being -called, because only one loop will be running. -""" - -ioloop.install() - -import tornado -import tornado.web - - -""" -this application can be used with echostream.py, start echostream.py, -start web.py, then every time you hit http://localhost:8888/, -echostream.py will print out 'hello' -""" - -def printer(msg): - print (msg) - -ctx = zmq.Context() -s = ctx.socket(zmq.REQ) -s.connect('tcp://127.0.0.1:5555') -stream = zmqstream.ZMQStream(s) -stream.on_recv(printer) - -class TestHandler(tornado.web.RequestHandler): - def get(self): - print ("sending hello") - stream.send("hello") - self.write("hello") -application = tornado.web.Application([(r"/", TestHandler)]) - -if __name__ == "__main__": - application.listen(8888) - ioloop.IOLoop.instance().start() - - diff --git a/examples/gevent/poll.py b/examples/gevent/poll.py deleted file mode 100644 index 1daf80a..0000000 --- a/examples/gevent/poll.py +++ /dev/null @@ -1,42 +0,0 @@ -import gevent -from zmq import green as zmq - -# Connect to both receiving sockets and send 10 messages -def sender(): - - sender = context.socket(zmq.PUSH) - sender.connect('inproc://polltest1') - sender.connect('inproc://polltest2') - - for i in xrange(10): - sender.send('test %d' % i) - gevent.sleep(1) - - -# create zmq context, and bind to pull sockets -context = zmq.Context() -receiver1 = context.socket(zmq.PULL) -receiver1.bind('inproc://polltest1') -receiver2 = context.socket(zmq.PULL) -receiver2.bind('inproc://polltest2') - -gevent.spawn(sender) - -# Create poller and register both reciever sockets -poller = zmq.Poller() -poller.register(receiver1, zmq.POLLIN) -poller.register(receiver2, zmq.POLLIN) - -# Read 10 messages from both reciever sockets -msgcnt = 0 -while msgcnt < 10: - socks = dict(poller.poll()) - if receiver1 in socks and socks[receiver1] == zmq.POLLIN: - print "Message from receiver1: %s" % receiver1.recv() - msgcnt += 1 - - if receiver2 in socks and socks[receiver2] == zmq.POLLIN: - print "Message from receiver2: %s" % receiver2.recv() - msgcnt += 1 - -print "%d messages received" % msgcnt diff --git a/examples/gevent/reqrep.py b/examples/gevent/reqrep.py deleted file mode 100644 index 2a4f307..0000000 --- a/examples/gevent/reqrep.py +++ /dev/null @@ -1,47 +0,0 @@ -""" -Complex example which is a combination of the rr* examples from the zguide. -""" -from gevent import spawn -import zmq.green as zmq - -# server -context = zmq.Context() -socket = context.socket(zmq.REP) -socket.connect("tcp://localhost:5560") - -def serve(socket): - while True: - message = socket.recv() - print "Received request: ", message - socket.send("World") -server = spawn(serve, socket) - - -# client -context = zmq.Context() -socket = context.socket(zmq.REQ) -socket.connect("tcp://localhost:5559") - -# Do 10 requests, waiting each time for a response -def client(): - for request in range(1,10): - socket.send("Hello") - message = socket.recv() - print "Received reply ", request, "[", message, "]" - - -# broker -frontend = context.socket(zmq.ROUTER) -backend = context.socket(zmq.DEALER); -frontend.bind("tcp://*:5559") -backend.bind("tcp://*:5560") - -def proxy(socket_from, socket_to): - while True: - m = socket_from.recv_multipart() - socket_to.send_multipart(m) - -a = spawn(proxy, frontend, backend) -b = spawn(proxy, backend, frontend) - -spawn(client).join() diff --git a/examples/gevent/simple.py b/examples/gevent/simple.py deleted file mode 100644 index ae065b3..0000000 --- a/examples/gevent/simple.py +++ /dev/null @@ -1,37 +0,0 @@ -from gevent import spawn, spawn_later -import zmq.green as zmq - -# server -print zmq.Context -ctx = zmq.Context() -sock = ctx.socket(zmq.PUSH) -sock.bind('ipc:///tmp/zmqtest') - -spawn(sock.send_pyobj, ('this', 'is', 'a', 'python', 'tuple')) -spawn_later(1, sock.send_pyobj, {'hi': 1234}) -spawn_later(2, sock.send_pyobj, ({'this': ['is a more complicated object', ':)']}, 42, 42, 42)) -spawn_later(3, sock.send_pyobj, 'foobar') -spawn_later(4, sock.send_pyobj, 'quit') - - -# client -ctx = zmq.Context() # create a new context to kick the wheels -sock = ctx.socket(zmq.PULL) -sock.connect('ipc:///tmp/zmqtest') - -def get_objs(sock): - while True: - o = sock.recv_pyobj() - print 'received python object:', o - if o == 'quit': - print 'exiting.' - break - -def print_every(s, t=None): - print s - if t: - spawn_later(t, print_every, s, t) - -print_every('printing every half second', 0.5) -spawn(get_objs, sock).join() - diff --git a/examples/heartbeat/heart.py b/examples/heartbeat/heart.py deleted file mode 100644 index 175370e..0000000 --- a/examples/heartbeat/heart.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python -"""This launches an echoing rep socket device, -and runs a blocking numpy action. The rep socket should -remain responsive to pings during this time. Use heartbeater.py to -ping this heart, and see the responsiveness. - -Authors -------- -* MinRK -""" - -import time -import numpy -import zmq -from zmq import devices - -ctx = zmq.Context() - -dev = devices.ThreadDevice(zmq.FORWARDER, zmq.SUB, zmq.DEALER) -dev.setsockopt_in(zmq.SUBSCRIBE, "") -dev.connect_in('tcp://127.0.0.1:5555') -dev.connect_out('tcp://127.0.0.1:5556') -dev.start() - -#wait for connections -time.sleep(1) - -A = numpy.random.random((2**11,2**11)) -print "starting blocking loop" -while True: - tic = time.time() - numpy.dot(A,A.transpose()) - print "blocked for %.3f s"%(time.time()-tic) - diff --git a/examples/heartbeat/heartbeater.py b/examples/heartbeat/heartbeater.py deleted file mode 100644 index 180828a..0000000 --- a/examples/heartbeat/heartbeater.py +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/env python -""" - -For use with heart.py - -A basic heartbeater using PUB and ROUTER sockets. pings are sent out on the PUB, and hearts -are tracked based on their DEALER identities. - -You can start many hearts with heart.py, and the heartbeater will monitor all of them, and notice when they stop responding. - -Authors -------- -* MinRK -""" - -import time -import zmq -from zmq.eventloop import ioloop, zmqstream - - -class HeartBeater(object): - """A basic HeartBeater class - pingstream: a PUB stream - pongstream: an ROUTER stream""" - - def __init__(self, loop, pingstream, pongstream, period=1000): - self.loop = loop - self.period = period - - self.pingstream = pingstream - self.pongstream = pongstream - self.pongstream.on_recv(self.handle_pong) - - self.hearts = set() - self.responses = set() - self.lifetime = 0 - self.tic = time.time() - - self.caller = ioloop.PeriodicCallback(self.beat, period, self.loop) - self.caller.start() - - def beat(self): - toc = time.time() - self.lifetime += toc-self.tic - self.tic = toc - print self.lifetime - # self.message = str(self.lifetime) - goodhearts = self.hearts.intersection(self.responses) - heartfailures = self.hearts.difference(goodhearts) - newhearts = self.responses.difference(goodhearts) - # print newhearts, goodhearts, heartfailures - map(self.handle_new_heart, newhearts) - map(self.handle_heart_failure, heartfailures) - self.responses = set() - print "%i beating hearts: %s"%(len(self.hearts),self.hearts) - self.pingstream.send(str(self.lifetime)) - - def handle_new_heart(self, heart): - print "yay, got new heart %s!"%heart - self.hearts.add(heart) - - def handle_heart_failure(self, heart): - print "Heart %s failed :("%heart - self.hearts.remove(heart) - - - def handle_pong(self, msg): - "if heart is beating" - if msg[1] == str(self.lifetime): - self.responses.add(msg[0]) - else: - print "got bad heartbeat (possibly old?): %s"%msg[1] - -# sub.setsockopt(zmq.SUBSCRIBE) - - -if __name__ == '__main__': - loop = ioloop.IOLoop() - context = zmq.Context() - pub = context.socket(zmq.PUB) - pub.bind('tcp://127.0.0.1:5555') - router = context.socket(zmq.ROUTER) - router.bind('tcp://127.0.0.1:5556') - - outstream = zmqstream.ZMQStream(pub, loop) - instream = zmqstream.ZMQStream(router, loop) - - hb = HeartBeater(loop, outstream, instream) - - loop.start() diff --git a/examples/heartbeat/ping.py b/examples/heartbeat/ping.py deleted file mode 100644 index 933a39a..0000000 --- a/examples/heartbeat/ping.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python -"""For use with pong.py - -This script simply pings a process started by pong.py or tspong.py, to -demonstrate that zmq remains responsive while Python blocks. - -Authors -------- -* MinRK -""" - -import time -import numpy -import zmq - -ctx = zmq.Context() - -req = ctx.socket(zmq.REQ) -req.connect('tcp://127.0.0.1:10111') - -#wait for connects -time.sleep(1) -n=0 -while True: - time.sleep(numpy.random.random()) - for i in range(4): - n+=1 - msg = 'ping %i'%n - tic = time.time() - req.send(msg) - resp = req.recv() - print "%s: %.2f ms" % (msg, 1000*(time.time()-tic)) - assert msg == resp - diff --git a/examples/heartbeat/pong.py b/examples/heartbeat/pong.py deleted file mode 100644 index 47efb3a..0000000 --- a/examples/heartbeat/pong.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python -"""This launches an echoing rep socket device using -zmq.devices.ThreadDevice, and runs a blocking numpy action. -The rep socket should remain responsive to pings during this time. - -Use ping.py to see how responsive it is. - -Authors -------- -* MinRK -""" - -import time -import numpy -import zmq -from zmq import devices - -ctx = zmq.Context() - -dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1) -dev.bind_in('tcp://127.0.0.1:10111') -dev.setsockopt_in(zmq.IDENTITY, "whoda") -dev.start() - -#wait for connections -time.sleep(1) - -A = numpy.random.random((2**11,2**12)) -print "starting blocking loop" -while True: - tic = time.time() - numpy.dot(A,A.transpose()) - print "blocked for %.3f s"%(time.time()-tic) - diff --git a/examples/logger/zmqlogger.py b/examples/logger/zmqlogger.py deleted file mode 100644 index c55b51b..0000000 --- a/examples/logger/zmqlogger.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -Simple example of using zmq log handlers - -This starts a number of subprocesses with PUBHandlers that generate -log messages at a regular interval. The main process has a SUB socket, -which aggregates and logs all of the messages to the root logger. -""" - -import logging -from multiprocessing import Process -import os -import random -import sys -import time - -import zmq -from zmq.log.handlers import PUBHandler - -LOG_LEVELS = (logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR, logging.CRITICAL) - -def sub_logger(port, level=logging.DEBUG): - ctx = zmq.Context() - sub = ctx.socket(zmq.SUB) - sub.bind('tcp://127.0.0.1:%i' % port) - sub.setsockopt(zmq.SUBSCRIBE, "") - logging.basicConfig(level=level) - - while True: - level, message = sub.recv_multipart() - if message.endswith('\n'): - # trim trailing newline, which will get appended again - message = message[:-1] - log = getattr(logging, level.lower()) - log(message) - -def log_worker(port, interval=1, level=logging.DEBUG): - ctx = zmq.Context() - pub = ctx.socket(zmq.PUB) - pub.connect('tcp://127.0.0.1:%i' % port) - - logger = logging.getLogger(str(os.getpid())) - logger.setLevel(level) - handler = PUBHandler(pub) - logger.addHandler(handler) - print "starting logger at %i with level=%s" % (os.getpid(), level) - - while True: - level = random.choice(LOG_LEVELS) - logger.log(level, "Hello from %i!" % os.getpid()) - time.sleep(interval) - -if __name__ == '__main__': - if len(sys.argv) > 1: - n = int(sys.argv[1]) - else: - n = 2 - - port = 5555 - - # start the log generators - workers = [ Process(target=log_worker, args=(port,), kwargs=dict(level=random.choice(LOG_LEVELS))) for i in range(n) ] - [ w.start() for w in workers ] - - # start the log watcher - try: - sub_logger(port) - except KeyboardInterrupt: - pass - finally: - [ w.terminate() for w in workers ] diff --git a/examples/mongodb/client.py b/examples/mongodb/client.py deleted file mode 100644 index 839dce7..0000000 --- a/examples/mongodb/client.py +++ /dev/null @@ -1,46 +0,0 @@ -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Justin Riley -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -import json -import zmq - -class MongoZMQClient(object): - """ - Client that connects with MongoZMQ server to add/fetch docs - """ - - def __init__(self, connect_addr='tcp://127.0.0.1:5000'): - self._context = zmq.Context() - self._socket = self._context.socket(zmq.DEALER) - self._socket.connect(connect_addr) - - def _send_recv_msg(self, msg): - self._socket.send_multipart(msg) - return self._socket.recv_multipart()[0] - - def get_doc(self, keys): - msg = ['get', json.dumps(keys)] - json_str = self._send_recv_msg(msg) - return json.loads(json_str) - - def add_doc(self, doc): - msg = ['add', json.dumps(doc)] - return self._send_recv_msg(msg) - -def main(): - client = MongoZMQClient() - for i in range(10): - doc = {'job': str(i)} - print "Adding doc", doc - print client.add_doc(doc) - for i in range(10): - query = {'job': str(i)} - print "Getting doc matching query:", query - print client.get_doc(query) - -if __name__ == "__main__": - main() diff --git a/examples/mongodb/controller.py b/examples/mongodb/controller.py deleted file mode 100644 index e154f1c..0000000 --- a/examples/mongodb/controller.py +++ /dev/null @@ -1,91 +0,0 @@ -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Justin Riley -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -import sys -import zmq -import pymongo -import pymongo.json_util -import json - -class MongoZMQ(object): - """ - ZMQ server that adds/fetches documents (ie dictionaries) to a MongoDB. - - NOTE: mongod must be started before using this class - """ - - def __init__(self, db_name, table_name, bind_addr="tcp://127.0.0.1:5000"): - """ - bind_addr: address to bind zmq socket on - db_name: name of database to write to (created if doesnt exist) - table_name: name of mongodb 'table' in the db to write to (created if doesnt exist) - """ - self._bind_addr = bind_addr - self._db_name = db_name - self._table_name = table_name - self._conn = pymongo.Connection() - self._db = self._conn[self._db_name] - self._table = self._db[self._table_name] - - def _doc_to_json(self, doc): - return json.dumps(doc,default=pymongo.json_util.default) - - def add_document(self, doc): - """ - Inserts a document (dictionary) into mongo database table - """ - print 'adding docment %s' % (doc) - try: - self._table.insert(doc) - except Exception,e: - return 'Error: %s' % e - - def get_document_by_keys(self, keys): - """ - Attempts to return a single document from database table that matches - each key/value in keys dictionary. - """ - print 'attempting to retrieve document using keys: %s' % keys - try: - return self._table.find_one(keys) - except Exception,e: - return 'Error: %s' % e - - def start(self): - context = zmq.Context() - socket = context.socket(zmq.ROUTER) - socket.bind(self._bind_addr) - while True: - msg = socket.recv_multipart() - print "Received msg: ", msg - if len(msg) != 3: - error_msg = 'invalid message received: %s' % msg - print error_msg - reply = [msg[0], error_msg] - socket.send_multipart(reply) - continue - id = msg[0] - operation = msg[1] - contents = json.loads(msg[2]) - # always send back the id with ROUTER - reply = [id] - if operation == 'add': - self.add_document(contents) - reply.append("success") - elif operation == 'get': - doc = self.get_document_by_keys(contents) - json_doc = self._doc_to_json(doc) - reply.append(json_doc) - else: - print 'unknown request' - socket.send_multipart(reply) - -def main(): - MongoZMQ('ipcontroller','jobs').start() - -if __name__ == "__main__": - main() diff --git a/examples/monitoring/simple_monitor.py b/examples/monitoring/simple_monitor.py deleted file mode 100644 index 88cdd4c..0000000 --- a/examples/monitoring/simple_monitor.py +++ /dev/null @@ -1,90 +0,0 @@ -# -*- coding: utf-8 -*- -"""Simple example demonstrating the use of the socket monitoring feature.""" - -# This file is part of pyzmq. -# -# Distributed under the terms of the New BSD License. The full -# license is in the file COPYING.BSD, distributed as part of this -# software. -from __future__ import print_function - -__author__ = 'Guido Goldstein' - -import json -import os -import struct -import sys -import threading -import time - -import zmq -from zmq.utils.monitor import recv_monitor_message - -line = lambda : print('-' * 40) - -def logger(monitor): - done = False - while monitor.poll(timeout=5000): - evt = recv_monitor_message(monitor) - print(json.dumps(evt, indent=1)) - if evt['event'] == zmq.EVENT_MONITOR_STOPPED: - break - print() - print("Logger done!") - monitor.close() - -print("libzmq-%s" % zmq.zmq_version()) -if zmq.zmq_version_info() < (4,0): - raise RuntimeError("monitoring in libzmq version < 4.0 is not supported") - -print("Event names:") -for name in dir(zmq): - if name.startswith('EVENT_'): - print("%21s : %4i" % (name, getattr(zmq, name))) - - -ctx = zmq.Context().instance() -rep = ctx.socket(zmq.REP) -req = ctx.socket(zmq.REQ) - -monitor = req.get_monitor_socket() - -t = threading.Thread(target=logger, args=(monitor,)) -t.start() - -line() -print("bind req") -req.bind("tcp://127.0.0.1:6666") -req.bind("tcp://127.0.0.1:6667") -time.sleep(1) - -line() -print("connect rep") -rep.connect("tcp://127.0.0.1:6667") -time.sleep(0.2) -rep.connect("tcp://127.0.0.1:6666") -time.sleep(1) - -line() -print("disconnect rep") -rep.disconnect("tcp://127.0.0.1:6667") -time.sleep(1) -rep.disconnect("tcp://127.0.0.1:6666") -time.sleep(1) - -line() -print("close rep") -rep.close() -time.sleep(1) - -line() -print("close req") -req.close() -time.sleep(1) - -line() -print("joining") -t.join() - -print("END") -ctx.term() diff --git a/examples/poll/pair.py b/examples/poll/pair.py deleted file mode 100644 index 81c8b3a..0000000 --- a/examples/poll/pair.py +++ /dev/null @@ -1,56 +0,0 @@ -"""A thorough test of polling PAIR sockets.""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Brian Granger -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -import time -import zmq - -print "Running polling tests for PAIR sockets..." - -addr = 'tcp://127.0.0.1:5555' -ctx = zmq.Context() -s1 = ctx.socket(zmq.PAIR) -s2 = ctx.socket(zmq.PAIR) - -s1.bind(addr) -s2.connect(addr) - -# Sleep to allow sockets to connect. -time.sleep(1.0) - -poller = zmq.Poller() -poller.register(s1, zmq.POLLIN|zmq.POLLOUT) -poller.register(s2, zmq.POLLIN|zmq.POLLOUT) - -# Now make sure that both are send ready. -socks = dict(poller.poll()) -assert socks[s1] == zmq.POLLOUT -assert socks[s2] == zmq.POLLOUT - -# Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN -s1.send('msg1') -s2.send('msg2') -time.sleep(1.0) -socks = dict(poller.poll()) -assert socks[s1] == zmq.POLLOUT|zmq.POLLIN -assert socks[s2] == zmq.POLLOUT|zmq.POLLIN - -# Make sure that both are in POLLOUT after recv. -s1.recv() -s2.recv() -socks = dict(poller.poll()) -assert socks[s1] == zmq.POLLOUT -assert socks[s2] == zmq.POLLOUT - -poller.unregister(s1) -poller.unregister(s2) - -# Wait for everything to finish. -time.sleep(1.0) - -print "Finished." \ No newline at end of file diff --git a/examples/poll/pubsub.py b/examples/poll/pubsub.py deleted file mode 100644 index a590fa9..0000000 --- a/examples/poll/pubsub.py +++ /dev/null @@ -1,57 +0,0 @@ -"""A thorough test of polling PUB/SUB sockets.""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Brian Granger -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -import time -import zmq - -print "Running polling tets for PUB/SUB sockets..." - -addr = 'tcp://127.0.0.1:5555' -ctx = zmq.Context() -s1 = ctx.socket(zmq.PUB) -s2 = ctx.socket(zmq.SUB) -s2.setsockopt(zmq.SUBSCRIBE, '') - -s1.bind(addr) -s2.connect(addr) - -# Sleep to allow sockets to connect. -time.sleep(1.0) - -poller = zmq.Poller() -poller.register(s1, zmq.POLLIN|zmq.POLLOUT) -poller.register(s2, zmq.POLLIN|zmq.POLLOUT) - -# Now make sure that both are send ready. -socks = dict(poller.poll()) -assert socks[s1] == zmq.POLLOUT -assert not socks.has_key(s2) - -# Make sure that s1 stays in POLLOUT after a send. -s1.send('msg1') -socks = dict(poller.poll()) -assert socks[s1] == zmq.POLLOUT - -# Make sure that s2 is POLLIN after waiting. -time.sleep(0.5) -socks = dict(poller.poll()) -assert socks[s2] == zmq.POLLIN - -# Make sure that s2 goes into 0 after recv. -s2.recv() -socks = dict(poller.poll()) -assert not socks.has_key(s2) - -poller.unregister(s1) -poller.unregister(s2) - -# Wait for everything to finish. -time.sleep(1.0) - -print "Finished." diff --git a/examples/poll/reqrep.py b/examples/poll/reqrep.py deleted file mode 100644 index ef4436c..0000000 --- a/examples/poll/reqrep.py +++ /dev/null @@ -1,71 +0,0 @@ -"""A thorough test of polling REQ/REP sockets.""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Brian Granger -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -import time -import zmq - -print "Running polling tests for REQ/REP sockets..." - -addr = 'tcp://127.0.0.1:5555' -ctx = zmq.Context() -s1 = ctx.socket(zmq.REP) -s2 = ctx.socket(zmq.REQ) - -s1.bind(addr) -s2.connect(addr) - -# Sleep to allow sockets to connect. -time.sleep(1.0) - -poller = zmq.Poller() -poller.register(s1, zmq.POLLIN|zmq.POLLOUT) -poller.register(s2, zmq.POLLIN|zmq.POLLOUT) - -# Make sure that s1 is in state 0 and s2 is in POLLOUT -socks = dict(poller.poll()) -assert not socks.has_key(s1) -assert socks[s2] == zmq.POLLOUT - -# Make sure that s2 goes immediately into state 0 after send. -s2.send('msg1') -socks = dict(poller.poll()) -assert not socks.has_key(s2) - -# Make sure that s1 goes into POLLIN state after a time.sleep(). -time.sleep(0.5) -socks = dict(poller.poll()) -assert socks[s1] == zmq.POLLIN - -# Make sure that s1 goes into POLLOUT after recv. -s1.recv() -socks = dict(poller.poll()) -assert socks[s1] == zmq.POLLOUT - -# Make sure s1 goes into state 0 after send. -s1.send('msg2') -socks = dict(poller.poll()) -assert not socks.has_key(s1) - -# Wait and then see that s2 is in POLLIN. -time.sleep(0.5) -socks = dict(poller.poll()) -assert socks[s2] == zmq.POLLIN - -# Make sure that s2 is in POLLOUT after recv. -s2.recv() -socks = dict(poller.poll()) -assert socks[s2] == zmq.POLLOUT - -poller.unregister(s1) -poller.unregister(s2) - -# Wait for everything to finish. -time.sleep(1.0) - -print "Finished." diff --git a/examples/pubsub/publisher.py b/examples/pubsub/publisher.py deleted file mode 100644 index a2ce6c9..0000000 --- a/examples/pubsub/publisher.py +++ /dev/null @@ -1,57 +0,0 @@ -"""A test that publishes NumPy arrays. - -Uses REQ/REP (on PUB/SUB socket + 1) to synchronize -""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Brian Granger -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -import sys -import time - -import zmq -import numpy - -def sync(bind_to): - # use bind socket + 1 - sync_with = ':'.join(bind_to.split(':')[:-1] + - [str(int(bind_to.split(':')[-1]) + 1)]) - ctx = zmq.Context.instance() - s = ctx.socket(zmq.REP) - s.bind(sync_with) - print "Waiting for subscriber to connect..." - s.recv() - print " Done." - s.send('GO') - -def main(): - if len (sys.argv) != 4: - print 'usage: publisher ' - sys.exit (1) - - try: - bind_to = sys.argv[1] - array_size = int(sys.argv[2]) - array_count = int (sys.argv[3]) - except (ValueError, OverflowError), e: - print 'array-size and array-count must be integers' - sys.exit (1) - - ctx = zmq.Context() - s = ctx.socket(zmq.PUB) - s.bind(bind_to) - - sync(bind_to) - - print "Sending arrays..." - for i in range(array_count): - a = numpy.random.rand(array_size, array_size) - s.send_pyobj(a) - print " Done." - -if __name__ == "__main__": - main() diff --git a/examples/pubsub/subscriber.py b/examples/pubsub/subscriber.py deleted file mode 100644 index b996ad8..0000000 --- a/examples/pubsub/subscriber.py +++ /dev/null @@ -1,74 +0,0 @@ -"""A test that subscribes to NumPy arrays. - -Uses REQ/REP (on PUB/SUB socket + 1) to synchronize -""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Brian Granger -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - - -import sys -import time - -import zmq -import numpy - -def sync(connect_to): - # use connect socket + 1 - sync_with = ':'.join(connect_to.split(':')[:-1] + - [str(int(connect_to.split(':')[-1]) + 1)] - ) - ctx = zmq.Context.instance() - s = ctx.socket(zmq.REQ) - s.connect(sync_with) - s.send('READY') - s.recv() - -def main(): - if len (sys.argv) != 3: - print 'usage: subscriber ' - sys.exit (1) - - try: - connect_to = sys.argv[1] - array_count = int (sys.argv[2]) - except (ValueError, OverflowError), e: - print 'array-count must be integers' - sys.exit (1) - - ctx = zmq.Context() - s = ctx.socket(zmq.SUB) - s.connect(connect_to) - s.setsockopt(zmq.SUBSCRIBE,'') - - sync(connect_to) - - start = time.clock() - - print "Receiving arrays..." - for i in range(array_count): - a = s.recv_pyobj() - print " Done." - - end = time.clock() - - elapsed = (end - start) * 1000000 - if elapsed == 0: - elapsed = 1 - throughput = (1000000.0 * float (array_count)) / float (elapsed) - message_size = a.nbytes - megabits = float (throughput * message_size * 8) / 1000000 - - print "message size: %.0f [B]" % (message_size, ) - print "array count: %.0f" % (array_count, ) - print "mean throughput: %.0f [msg/s]" % (throughput, ) - print "mean throughput: %.3f [Mb/s]" % (megabits, ) - - time.sleep(1.0) - -if __name__ == "__main__": - main() diff --git a/examples/pubsub/topics_pub.py b/examples/pubsub/topics_pub.py deleted file mode 100755 index 73b3d1c..0000000 --- a/examples/pubsub/topics_pub.py +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python -"""Simple example of publish/subscribe illustrating topics. - -Publisher and subscriber can be started in any order, though if publisher -starts first, any messages sent before subscriber starts are lost. More than -one subscriber can listen, and they can listen to different topics. - -Topic filtering is done simply on the start of the string, e.g. listening to -'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to -catch 'weather'. -""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Brian Granger -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -import itertools -import sys -import time - -import zmq - -def main(): - if len (sys.argv) != 2: - print 'usage: publisher ' - sys.exit (1) - - bind_to = sys.argv[1] - - all_topics = ['sports.general','sports.football','sports.basketball', - 'stocks.general','stocks.GOOG','stocks.AAPL', - 'weather'] - - ctx = zmq.Context() - s = ctx.socket(zmq.PUB) - s.bind(bind_to) - - print "Starting broadcast on topics:" - print " %s" % all_topics - print "Hit Ctrl-C to stop broadcasting." - print "Waiting so subscriber sockets can connect..." - print - time.sleep(1.0) - - msg_counter = itertools.count() - try: - for topic in itertools.cycle(all_topics): - msg_body = str(msg_counter.next()) - print ' Topic: %s, msg:%s' % (topic, msg_body) - s.send_multipart([topic, msg_body]) - # short wait so we don't hog the cpu - time.sleep(0.1) - except KeyboardInterrupt: - pass - - print "Waiting for message queues to flush..." - time.sleep(0.5) - print "Done." - -if __name__ == "__main__": - main() diff --git a/examples/pubsub/topics_sub.py b/examples/pubsub/topics_sub.py deleted file mode 100755 index 4a61fb5..0000000 --- a/examples/pubsub/topics_sub.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python -"""Simple example of publish/subscribe illustrating topics. - -Publisher and subscriber can be started in any order, though if publisher -starts first, any messages sent before subscriber starts are lost. More than -one subscriber can listen, and they can listen to different topics. - -Topic filtering is done simply on the start of the string, e.g. listening to -'s' will catch 'sports...' and 'stocks' while listening to 'w' is enough to -catch 'weather'. -""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010 Brian Granger, Fernando Perez -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -import sys -import time - -import zmq -import numpy - -def main(): - if len (sys.argv) < 2: - print 'usage: subscriber [topic topic ...]' - sys.exit (1) - - connect_to = sys.argv[1] - topics = sys.argv[2:] - - ctx = zmq.Context() - s = ctx.socket(zmq.SUB) - s.connect(connect_to) - - # manage subscriptions - if not topics: - print "Receiving messages on ALL topics..." - s.setsockopt(zmq.SUBSCRIBE,'') - else: - print "Receiving messages on topics: %s ..." % topics - for t in topics: - s.setsockopt(zmq.SUBSCRIBE,t) - print - try: - while True: - topic, msg = s.recv_multipart() - print ' Topic: %s, msg:%s' % (topic, msg) - except KeyboardInterrupt: - pass - print "Done." - -if __name__ == "__main__": - main() diff --git a/examples/security/generate_certificates.py b/examples/security/generate_certificates.py deleted file mode 100644 index 80db258..0000000 --- a/examples/security/generate_certificates.py +++ /dev/null @@ -1,49 +0,0 @@ -#!/usr/bin/env python - -""" -Generate client and server CURVE certificate files then move them into the -appropriate store directory, private_keys or public_keys. The certificates -generated by this script are used by the stonehouse and ironhouse examples. - -In practice this would be done by hand or some out-of-band process. - -Author: Chris Laws -""" - -import os -import shutil -import zmq.auth - -def generate_certificates(base_dir): - ''' Generate client and server CURVE certificate files''' - keys_dir = os.path.join(base_dir, 'certificates') - public_keys_dir = os.path.join(base_dir, 'public_keys') - secret_keys_dir = os.path.join(base_dir, 'private_keys') - - # Create directories for certificates, remove old content if necessary - for d in [keys_dir, public_keys_dir, secret_keys_dir]: - if os.path.exists(d): - shutil.rmtree(d) - os.mkdir(d) - - # create new keys in certificates dir - server_public_file, server_secret_file = zmq.auth.create_certificates(keys_dir, "server") - client_public_file, client_secret_file = zmq.auth.create_certificates(keys_dir, "client") - - # move public keys to appropriate directory - for key_file in os.listdir(keys_dir): - if key_file.endswith(".key"): - shutil.move(os.path.join(keys_dir, key_file), - os.path.join(public_keys_dir, '.')) - - # move secret keys to appropriate directory - for key_file in os.listdir(keys_dir): - if key_file.endswith(".key_secret"): - shutil.move(os.path.join(keys_dir, key_file), - os.path.join(secret_keys_dir, '.')) - -if __name__ == '__main__': - if zmq.zmq_version_info() < (4,0): - raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) - - generate_certificates(os.path.dirname(__file__)) diff --git a/examples/security/grasslands.py b/examples/security/grasslands.py deleted file mode 100644 index cbd3ab9..0000000 --- a/examples/security/grasslands.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python - -''' -No protection at all. - -All connections are accepted, there is no authentication, and no privacy. - -This is how ZeroMQ always worked until we built security into the wire -protocol in early 2013. Internally, it uses a security mechanism called -"NULL". - -Author: Chris Laws -''' - -import zmq - - -ctx = zmq.Context().instance() - -server = ctx.socket(zmq.PUSH) -server.bind('tcp://*:9000') - -client = ctx.socket(zmq.PULL) -client.connect('tcp://127.0.0.1:9000') - -server.send(b"Hello") -msg = client.recv() -if msg == b"Hello": - print("Grasslands test OK") diff --git a/examples/security/ioloop-ironhouse.py b/examples/security/ioloop-ironhouse.py deleted file mode 100644 index 11fd400..0000000 --- a/examples/security/ioloop-ironhouse.py +++ /dev/null @@ -1,114 +0,0 @@ -#!/usr/bin/env python - -''' -Ironhouse extends Stonehouse with client public key authentication. - -This is the strongest security model we have today, protecting against every -attack we know about, except end-point attacks (where an attacker plants -spyware on a machine to capture data before it's encrypted, or after it's -decrypted). - -This example demonstrates using the IOLoopAuthenticator. - -Author: Chris Laws -''' - -import logging -import os -import sys - -import zmq -import zmq.auth -from zmq.auth.ioloop import IOLoopAuthenticator -from zmq.eventloop import ioloop, zmqstream - -def echo(server, msg): - logging.debug("server recvd %s", msg) - reply = msg + [b'World'] - logging.debug("server sending %s", reply) - server.send_multipart(reply) - -def setup_server(server_secret_file, endpoint='tcp://127.0.0.1:9000'): - """setup a simple echo server with CURVE auth""" - server = zmq.Context.instance().socket(zmq.ROUTER) - - server_public, server_secret = zmq.auth.load_certificate(server_secret_file) - server.curve_secretkey = server_secret - server.curve_publickey = server_public - server.curve_server = True # must come before bind - server.bind(endpoint) - - server_stream = zmqstream.ZMQStream(server) - # simple echo - server_stream.on_recv_stream(echo) - return server_stream - -def client_msg_recvd(msg): - logging.debug("client recvd %s", msg) - logging.info("Ironhouse test OK") - # stop the loop when we get the reply - ioloop.IOLoop.instance().stop() - -def setup_client(client_secret_file, server_public_file, endpoint='tcp://127.0.0.1:9000'): - """setup a simple client with CURVE auth""" - - client = zmq.Context.instance().socket(zmq.DEALER) - - # We need two certificates, one for the client and one for - # the server. The client must know the server's public key - # to make a CURVE connection. - client_public, client_secret = zmq.auth.load_certificate(client_secret_file) - client.curve_secretkey = client_secret - client.curve_publickey = client_public - - server_public, _ = zmq.auth.load_certificate(server_public_file) - # The client must know the server's public key to make a CURVE connection. - client.curve_serverkey = server_public - client.connect(endpoint) - - client_stream = zmqstream.ZMQStream(client) - client_stream.on_recv(client_msg_recvd) - return client_stream - - -def run(): - '''Run Ironhouse example''' - - # These direcotries are generated by the generate_certificates script - base_dir = os.path.dirname(__file__) - keys_dir = os.path.join(base_dir, 'certificates') - public_keys_dir = os.path.join(base_dir, 'public_keys') - secret_keys_dir = os.path.join(base_dir, 'private_keys') - - if not (os.path.exists(keys_dir) and os.path.exists(keys_dir) and os.path.exists(keys_dir)): - logging.critical("Certificates are missing - run generate_certificates script first") - sys.exit(1) - - # Start an authenticator for this context. - auth = IOLoopAuthenticator() - auth.allow('127.0.0.1') - # Tell authenticator to use the certificate in a directory - auth.configure_curve(domain='*', location=public_keys_dir) - - server_secret_file = os.path.join(secret_keys_dir, "server.key_secret") - server = setup_server(server_secret_file) - server_public_file = os.path.join(public_keys_dir, "server.key") - client_secret_file = os.path.join(secret_keys_dir, "client.key_secret") - client = setup_client(client_secret_file, server_public_file) - client.send(b'Hello') - - auth.start() - ioloop.IOLoop.instance().start() - -if __name__ == '__main__': - if zmq.zmq_version_info() < (4,0): - raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) - - if '-v' in sys.argv: - level = logging.DEBUG - else: - level = logging.INFO - - logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") - - run() diff --git a/examples/security/ironhouse.py b/examples/security/ironhouse.py deleted file mode 100644 index 5d9faf1..0000000 --- a/examples/security/ironhouse.py +++ /dev/null @@ -1,93 +0,0 @@ -#!/usr/bin/env python - -''' -Ironhouse extends Stonehouse with client public key authentication. - -This is the strongest security model we have today, protecting against every -attack we know about, except end-point attacks (where an attacker plants -spyware on a machine to capture data before it's encrypted, or after it's -decrypted). - -Author: Chris Laws -''' - -import logging -import os -import sys - -import zmq -import zmq.auth -from zmq.auth.thread import ThreadAuthenticator - - -def run(): - ''' Run Ironhouse example ''' - - # These direcotries are generated by the generate_certificates script - base_dir = os.path.dirname(__file__) - keys_dir = os.path.join(base_dir, 'certificates') - public_keys_dir = os.path.join(base_dir, 'public_keys') - secret_keys_dir = os.path.join(base_dir, 'private_keys') - - if not (os.path.exists(keys_dir) and os.path.exists(keys_dir) and os.path.exists(keys_dir)): - logging.critical("Certificates are missing - run generate_certificates.py script first") - sys.exit(1) - - ctx = zmq.Context().instance() - - # Start an authenticator for this context. - auth = ThreadAuthenticator(ctx) - auth.start() - auth.allow('127.0.0.1') - # Tell authenticator to use the certificate in a directory - auth.configure_curve(domain='*', location=public_keys_dir) - - server = ctx.socket(zmq.PUSH) - - server_secret_file = os.path.join(secret_keys_dir, "server.key_secret") - server_public, server_secret = zmq.auth.load_certificate(server_secret_file) - server.curve_secretkey = server_secret - server.curve_publickey = server_public - server.curve_server = True # must come before bind - server.bind('tcp://*:9000') - - client = ctx.socket(zmq.PULL) - - # We need two certificates, one for the client and one for - # the server. The client must know the server's public key - # to make a CURVE connection. - client_secret_file = os.path.join(secret_keys_dir, "client.key_secret") - client_public, client_secret = zmq.auth.load_certificate(client_secret_file) - client.curve_secretkey = client_secret - client.curve_publickey = client_public - - server_public_file = os.path.join(public_keys_dir, "server.key") - server_public, _ = zmq.auth.load_certificate(server_public_file) - # The client must know the server's public key to make a CURVE connection. - client.curve_serverkey = server_public - client.connect('tcp://127.0.0.1:9000') - - server.send(b"Hello") - - if client.poll(1000): - msg = client.recv() - if msg == b"Hello": - logging.info("Ironhouse test OK") - else: - logging.error("Ironhouse test FAIL") - - # stop auth thread - auth.stop() - -if __name__ == '__main__': - if zmq.zmq_version_info() < (4,0): - raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) - - if '-v' in sys.argv: - level = logging.DEBUG - else: - level = logging.INFO - - logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") - - run() diff --git a/examples/security/stonehouse.py b/examples/security/stonehouse.py deleted file mode 100644 index 276e87a..0000000 --- a/examples/security/stonehouse.py +++ /dev/null @@ -1,93 +0,0 @@ -#!/usr/bin/env python - -''' -Stonehouse uses the "CURVE" security mechanism. - -This gives us strong encryption on data, and (as far as we know) unbreakable -authentication. Stonehouse is the minimum you would use over public networks, -and assures clients that they are speaking to an authentic server, while -allowing any client to connect. - -Author: Chris Laws -''' - -import logging -import os -import sys -import time - -import zmq -import zmq.auth -from zmq.auth.thread import ThreadAuthenticator - - -def run(): - ''' Run Stonehouse example ''' - - # These directories are generated by the generate_certificates script - base_dir = os.path.dirname(__file__) - keys_dir = os.path.join(base_dir, 'certificates') - public_keys_dir = os.path.join(base_dir, 'public_keys') - secret_keys_dir = os.path.join(base_dir, 'private_keys') - - if not (os.path.exists(keys_dir) and os.path.exists(keys_dir) and os.path.exists(keys_dir)): - logging.critical("Certificates are missing: run generate_certificates.py script first") - sys.exit(1) - - ctx = zmq.Context().instance() - - # Start an authenticator for this context. - auth = ThreadAuthenticator(ctx) - auth.start() - auth.allow('127.0.0.1') - # Tell the authenticator how to handle CURVE requests - auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY) - - server = ctx.socket(zmq.PUSH) - server_secret_file = os.path.join(secret_keys_dir, "server.key_secret") - server_public, server_secret = zmq.auth.load_certificate(server_secret_file) - server.curve_secretkey = server_secret - server.curve_publickey = server_public - server.curve_server = True # must come before bind - server.bind('tcp://*:9000') - - client = ctx.socket(zmq.PULL) - # We need two certificates, one for the client and one for - # the server. The client must know the server's public key - # to make a CURVE connection. - client_secret_file = os.path.join(secret_keys_dir, "client.key_secret") - client_public, client_secret = zmq.auth.load_certificate(client_secret_file) - client.curve_secretkey = client_secret - client.curve_publickey = client_public - - # The client must know the server's public key to make a CURVE connection. - server_public_file = os.path.join(public_keys_dir, "server.key") - server_public, _ = zmq.auth.load_certificate(server_public_file) - client.curve_serverkey = server_public - - client.connect('tcp://127.0.0.1:9000') - - server.send(b"Hello") - - if client.poll(1000): - msg = client.recv() - if msg == b"Hello": - logging.info("Stonehouse test OK") - else: - logging.error("Stonehouse test FAIL") - - # stop auth thread - auth.stop() - -if __name__ == '__main__': - if zmq.zmq_version_info() < (4,0): - raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) - - if '-v' in sys.argv: - level = logging.DEBUG - else: - level = logging.INFO - - logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") - - run() diff --git a/examples/security/strawhouse.py b/examples/security/strawhouse.py deleted file mode 100644 index dc75bd7..0000000 --- a/examples/security/strawhouse.py +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/env python - -''' -Allow or deny clients based on IP address. - -Strawhouse, which is plain text with filtering on IP addresses. It still -uses the NULL mechanism, but we install an authentication hook that checks -the IP address against a whitelist or blacklist and allows or denies it -accordingly. - -Author: Chris Laws -''' - -import logging -import sys - -import zmq -import zmq.auth -from zmq.auth.thread import ThreadAuthenticator - - -def run(): - '''Run strawhouse client''' - - allow_test_pass = False - deny_test_pass = False - - ctx = zmq.Context().instance() - - # Start an authenticator for this context. - auth = ThreadAuthenticator(ctx) - auth.start() - - # Part 1 - demonstrate allowing clients based on IP address - auth.allow('127.0.0.1') - - server = ctx.socket(zmq.PUSH) - server.zap_domain = b'global' # must come before bind - server.bind('tcp://*:9000') - - client_allow = ctx.socket(zmq.PULL) - client_allow.connect('tcp://127.0.0.1:9000') - - server.send(b"Hello") - - msg = client_allow.recv() - if msg == b"Hello": - allow_test_pass = True - - client_allow.close() - - # Part 2 - demonstrate denying clients based on IP address - auth.stop() - - auth = ThreadAuthenticator(ctx) - auth.start() - - auth.deny('127.0.0.1') - - client_deny = ctx.socket(zmq.PULL) - client_deny.connect('tcp://127.0.0.1:9000') - - if server.poll(50, zmq.POLLOUT): - server.send(b"Hello") - - if client_deny.poll(50): - msg = client_deny.recv() - else: - deny_test_pass = True - else: - deny_test_pass = True - - client_deny.close() - - auth.stop() # stop auth thread - - if allow_test_pass and deny_test_pass: - logging.info("Strawhouse test OK") - else: - logging.error("Strawhouse test FAIL") - - -if __name__ == '__main__': - if zmq.zmq_version_info() < (4,0): - raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) - - if '-v' in sys.argv: - level = logging.DEBUG - else: - level = logging.INFO - - logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") - - run() diff --git a/examples/security/woodhouse.py b/examples/security/woodhouse.py deleted file mode 100644 index efedee4..0000000 --- a/examples/security/woodhouse.py +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/env python - -''' -Woodhouse extends Strawhouse with a name and password check. - -This uses the PLAIN mechanism which does plain-text username and password authentication). -It's not really secure, and anyone sniffing the network (trivial with WiFi) -can capture passwords and then login. - -Author: Chris Laws -''' - -import logging -import sys - -import zmq -import zmq.auth -from zmq.auth.thread import ThreadAuthenticator - -def run(): - '''Run woodhouse example''' - - valid_client_test_pass = False - invalid_client_test_pass = False - - ctx = zmq.Context().instance() - - # Start an authenticator for this context. - auth = ThreadAuthenticator(ctx) - auth.start() - auth.allow('127.0.0.1') - # Instruct authenticator to handle PLAIN requests - auth.configure_plain(domain='*', passwords={'admin': 'secret'}) - - server = ctx.socket(zmq.PUSH) - server.plain_server = True # must come before bind - server.bind('tcp://*:9000') - - client = ctx.socket(zmq.PULL) - client.plain_username = b'admin' - client.plain_password = b'secret' - client.connect('tcp://127.0.0.1:9000') - - server.send(b"Hello") - - if client.poll(): - msg = client.recv() - if msg == b"Hello": - valid_client_test_pass = True - - client.close() - - - # now use invalid credentials - expect no msg received - client2 = ctx.socket(zmq.PULL) - client2.plain_username = b'admin' - client2.plain_password = b'bogus' - client2.connect('tcp://127.0.0.1:9000') - - server.send(b"World") - - if client2.poll(50): - msg = client.recv() - if msg == "World": - invalid_client_test_pass = False - else: - # no message is expected - invalid_client_test_pass = True - - # stop auth thread - auth.stop() - - if valid_client_test_pass and invalid_client_test_pass: - logging.info("Woodhouse test OK") - else: - logging.error("Woodhouse test FAIL") - - -if __name__ == '__main__': - if zmq.zmq_version_info() < (4,0): - raise RuntimeError("Security is not supported in libzmq version < 4.0. libzmq version {0}".format(zmq.zmq_version())) - - if '-v' in sys.argv: - level = logging.DEBUG - else: - level = logging.INFO - - logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") - - run() diff --git a/examples/serialization/serialsocket.py b/examples/serialization/serialsocket.py deleted file mode 100644 index 7329bb9..0000000 --- a/examples/serialization/serialsocket.py +++ /dev/null @@ -1,74 +0,0 @@ -"""A Socket subclass that adds some serialization methods.""" - -import zlib -import pickle - -import numpy - -import zmq - -class SerializingSocket(zmq.Socket): - """A class with some extra serialization methods - - send_zipped_pickle is just like send_pyobj, but uses - zlib to compress the stream before sending. - - send_array sends numpy arrays with metadata necessary - for reconstructing the array on the other side (dtype,shape). - """ - - def send_zipped_pickle(self, obj, flags=0, protocol=-1): - """pack and compress an object with pickle and zlib.""" - pobj = pickle.dumps(obj, protocol) - zobj = zlib.compress(pobj) - print('zipped pickle is %i bytes' % len(zobj)) - return self.send(zobj, flags=flags) - - def recv_zipped_pickle(self, flags=0): - """reconstruct a Python object sent with zipped_pickle""" - zobj = self.recv(flags) - pobj = zlib.decompress(zobj) - return pickle.loads(pobj) - - def send_array(self, A, flags=0, copy=True, track=False): - """send a numpy array with metadata""" - md = dict( - dtype = str(A.dtype), - shape = A.shape, - ) - self.send_json(md, flags|zmq.SNDMORE) - return self.send(A, flags, copy=copy, track=track) - - def recv_array(self, flags=0, copy=True, track=False): - """recv a numpy array""" - md = self.recv_json(flags=flags) - msg = self.recv(flags=flags, copy=copy, track=track) - A = numpy.frombuffer(msg, dtype=md['dtype']) - return A.reshape(md['shape']) - -class SerializingContext(zmq.Context): - _socket_class = SerializingSocket - -def main(): - ctx = SerializingContext() - req = ctx.socket(zmq.REQ) - rep = ctx.socket(zmq.REP) - - rep.bind('inproc://a') - req.connect('inproc://a') - A = numpy.ones((1024,1024)) - print ("Array is %i bytes" % (len(A) * 8)) - - # send/recv with pickle+zip - req.send_zipped_pickle(A) - B = rep.recv_zipped_pickle() - # now try non-copying version - rep.send_array(A, copy=False) - C = req.recv_array(copy=False) - print ("Checking zipped pickle...") - print ("Okay" if (A==B).all() else "Failed") - print ("Checking send_array...") - print ("Okay" if (C==B).all() else "Failed") - -if __name__ == '__main__': - main() \ No newline at end of file -- cgit v1.2.3