summaryrefslogtreecommitdiff
path: root/zmq/devices/monitoredqueue.pyx
blob: d5fec64d87cc0d3cbd8f6bcac48a8dbf84b34f95 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""MonitoredQueue classes and functions.

Authors
-------
* MinRK
* Brian Granger
"""

#-----------------------------------------------------------------------------
#  Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
#
#  This file is part of pyzmq
#
#  Distributed under the terms of the New BSD License.  The full license is in
#  the file COPYING.BSD, distributed as part of this software.
#-----------------------------------------------------------------------------

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------

cdef extern from "Python.h":
    ctypedef int Py_ssize_t

from libc.string cimport memcpy

from buffers cimport asbuffer_r
from libzmq cimport *

from zmq.backend.cython.socket cimport Socket
from zmq.backend.cython.checkrc cimport _check_rc

from zmq import ROUTER, ZMQError

#-----------------------------------------------------------------------------
# MonitoredQueue functions
#-----------------------------------------------------------------------------


def monitored_queue(Socket in_socket, Socket out_socket, Socket mon_socket,
                    bytes in_prefix=b'in', bytes out_prefix=b'out'):
    """monitored_queue(in_socket, out_socket, mon_socket,
                       in_prefix=b'in', out_prefix=b'out')
    
    Start a monitored queue device.
    
    A monitored queue is very similar to the zmq.proxy device (monitored queue came first).
    
    Differences from zmq.proxy:
    
    - monitored_queue supports both in and out being ROUTER sockets
      (via swapping IDENTITY prefixes).
    - monitor messages are prefixed, making in and out messages distinguishable.
    
    Parameters
    ----------
    in_socket : Socket
        One of the sockets to the Queue. Its messages will be prefixed with
        'in'.
    out_socket : Socket
        One of the sockets to the Queue. Its messages will be prefixed with
        'out'. The only difference between in/out socket is this prefix.
    mon_socket : Socket
        This socket sends out every message received by each of the others
        with an in/out prefix specifying which one it was.
    in_prefix : str
        Prefix added to broadcast messages from in_socket.
    out_prefix : str
        Prefix added to broadcast messages from out_socket.
    """
    
    cdef void *ins=in_socket.handle
    cdef void *outs=out_socket.handle
    cdef void *mons=mon_socket.handle
    cdef zmq_msg_t in_msg
    cdef zmq_msg_t out_msg
    cdef bint swap_ids
    cdef char *msg_c = NULL
    cdef Py_ssize_t msg_c_len
    cdef int rc

    # force swap_ids if both ROUTERs
    swap_ids = (in_socket.type == ROUTER and out_socket.type == ROUTER)
    
    # build zmq_msg objects from str prefixes
    asbuffer_r(in_prefix, <void **>&msg_c, &msg_c_len)
    rc = zmq_msg_init_size(&in_msg, msg_c_len)
    _check_rc(rc)
    
    memcpy(zmq_msg_data(&in_msg), msg_c, zmq_msg_size(&in_msg))
    
    asbuffer_r(out_prefix, <void **>&msg_c, &msg_c_len)
    
    rc = zmq_msg_init_size(&out_msg, msg_c_len)
    _check_rc(rc)
    
    with nogil:
        memcpy(zmq_msg_data(&out_msg), msg_c, zmq_msg_size(&out_msg))
        rc = c_monitored_queue(ins, outs, mons, &in_msg, &out_msg, swap_ids)
    _check_rc(rc)
    return rc

__all__ = ['monitored_queue']