From 7d5c3dcd969161322deed6c43f8a6a3cb92c3369 Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Tue, 11 Nov 2014 11:53:55 -0500 Subject: upgrade to 14.4.1 --- zmq/devices/monitoredqueue.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 zmq/devices/monitoredqueue.py (limited to 'zmq/devices/monitoredqueue.py') diff --git a/zmq/devices/monitoredqueue.py b/zmq/devices/monitoredqueue.py new file mode 100644 index 0000000..c6d9142 --- /dev/null +++ b/zmq/devices/monitoredqueue.py @@ -0,0 +1,37 @@ +"""pure Python monitored_queue function + +For use when Cython extension is unavailable (PyPy). + +Authors +------- +* MinRK +""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +import zmq + +def _relay(ins, outs, sides, prefix, swap_ids): + msg = ins.recv_multipart() + if swap_ids: + msg[:2] = msg[:2][::-1] + outs.send_multipart(msg) + sides.send_multipart([prefix] + msg) + +def monitored_queue(in_socket, out_socket, mon_socket, + in_prefix=b'in', out_prefix=b'out'): + + swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER + + poller = zmq.Poller() + poller.register(in_socket, zmq.POLLIN) + poller.register(out_socket, zmq.POLLIN) + while True: + events = dict(poller.poll()) + if in_socket in events: + _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids) + if out_socket in events: + _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids) + +__all__ = ['monitored_queue'] -- cgit v1.2.3