summaryrefslogtreecommitdiff
path: root/zmq/devices/monitoredqueue.pyx
diff options
context:
space:
mode:
Diffstat (limited to 'zmq/devices/monitoredqueue.pyx')
-rw-r--r--zmq/devices/monitoredqueue.pyx103
1 files changed, 103 insertions, 0 deletions
diff --git a/zmq/devices/monitoredqueue.pyx b/zmq/devices/monitoredqueue.pyx
new file mode 100644
index 0000000..d5fec64
--- /dev/null
+++ b/zmq/devices/monitoredqueue.pyx
@@ -0,0 +1,103 @@
+"""MonitoredQueue classes and functions.
+
+Authors
+-------
+* MinRK
+* Brian Granger
+"""
+
+#-----------------------------------------------------------------------------
+# Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
+#
+# This file is part of pyzmq
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+cdef extern from "Python.h":
+ ctypedef int Py_ssize_t
+
+from libc.string cimport memcpy
+
+from buffers cimport asbuffer_r
+from libzmq cimport *
+
+from zmq.backend.cython.socket cimport Socket
+from zmq.backend.cython.checkrc cimport _check_rc
+
+from zmq import ROUTER, ZMQError
+
+#-----------------------------------------------------------------------------
+# MonitoredQueue functions
+#-----------------------------------------------------------------------------
+
+
+def monitored_queue(Socket in_socket, Socket out_socket, Socket mon_socket,
+ bytes in_prefix=b'in', bytes out_prefix=b'out'):
+ """monitored_queue(in_socket, out_socket, mon_socket,
+ in_prefix=b'in', out_prefix=b'out')
+
+ Start a monitored queue device.
+
+ A monitored queue is very similar to the zmq.proxy device (monitored queue came first).
+
+ Differences from zmq.proxy:
+
+ - monitored_queue supports both in and out being ROUTER sockets
+ (via swapping IDENTITY prefixes).
+ - monitor messages are prefixed, making in and out messages distinguishable.
+
+ Parameters
+ ----------
+ in_socket : Socket
+ One of the sockets to the Queue. Its messages will be prefixed with
+ 'in'.
+ out_socket : Socket
+ One of the sockets to the Queue. Its messages will be prefixed with
+ 'out'. The only difference between in/out socket is this prefix.
+ mon_socket : Socket
+ This socket sends out every message received by each of the others
+ with an in/out prefix specifying which one it was.
+ in_prefix : str
+ Prefix added to broadcast messages from in_socket.
+ out_prefix : str
+ Prefix added to broadcast messages from out_socket.
+ """
+
+ cdef void *ins=in_socket.handle
+ cdef void *outs=out_socket.handle
+ cdef void *mons=mon_socket.handle
+ cdef zmq_msg_t in_msg
+ cdef zmq_msg_t out_msg
+ cdef bint swap_ids
+ cdef char *msg_c = NULL
+ cdef Py_ssize_t msg_c_len
+ cdef int rc
+
+ # force swap_ids if both ROUTERs
+ swap_ids = (in_socket.type == ROUTER and out_socket.type == ROUTER)
+
+ # build zmq_msg objects from str prefixes
+ asbuffer_r(in_prefix, <void **>&msg_c, &msg_c_len)
+ rc = zmq_msg_init_size(&in_msg, msg_c_len)
+ _check_rc(rc)
+
+ memcpy(zmq_msg_data(&in_msg), msg_c, zmq_msg_size(&in_msg))
+
+ asbuffer_r(out_prefix, <void **>&msg_c, &msg_c_len)
+
+ rc = zmq_msg_init_size(&out_msg, msg_c_len)
+ _check_rc(rc)
+
+ with nogil:
+ memcpy(zmq_msg_data(&out_msg), msg_c, zmq_msg_size(&out_msg))
+ rc = c_monitored_queue(ins, outs, mons, &in_msg, &out_msg, swap_ids)
+ _check_rc(rc)
+ return rc
+
+__all__ = ['monitored_queue']