summaryrefslogtreecommitdiff
path: root/zmq/devices/proxydevice.py
blob: 68be3f159778fb63365503decec44f14322037bd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""Proxy classes and functions."""

# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.

import zmq
from zmq.devices.basedevice import Device, ThreadDevice, ProcessDevice


class ProxyBase(object):
    """Base class for overriding methods."""
    
    def __init__(self, in_type, out_type, mon_type=zmq.PUB):
        
        Device.__init__(self, in_type=in_type, out_type=out_type)
        self.mon_type = mon_type
        self._mon_binds = []
        self._mon_connects = []
        self._mon_sockopts = []

    def bind_mon(self, addr):
        """Enqueue ZMQ address for binding on mon_socket.

        See zmq.Socket.bind for details.
        """
        self._mon_binds.append(addr)

    def connect_mon(self, addr):
        """Enqueue ZMQ address for connecting on mon_socket.

        See zmq.Socket.bind for details.
        """
        self._mon_connects.append(addr)

    def setsockopt_mon(self, opt, value):
        """Enqueue setsockopt(opt, value) for mon_socket

        See zmq.Socket.setsockopt for details.
        """
        self._mon_sockopts.append((opt, value))

    def _setup_sockets(self):
        ins,outs = Device._setup_sockets(self)
        ctx = self._context
        mons = ctx.socket(self.mon_type)
        
        # set sockopts (must be done first, in case of zmq.IDENTITY)
        for opt,value in self._mon_sockopts:
            mons.setsockopt(opt, value)
        
        for iface in self._mon_binds:
            mons.bind(iface)
        
        for iface in self._mon_connects:
            mons.connect(iface)
        
        return ins,outs,mons
    
    def run_device(self):
        ins,outs,mons = self._setup_sockets()
        zmq.proxy(ins, outs, mons)

class Proxy(ProxyBase, Device):
    """Threadsafe Proxy object.

    See zmq.devices.Device for most of the spec. This subclass adds a
    <method>_mon version of each <method>_{in|out} method, for configuring the
    monitor socket.

    A Proxy is a 3-socket ZMQ Device that functions just like a
    QUEUE, except each message is also sent out on the monitor socket.

    A PUB socket is the most logical choice for the mon_socket, but it is not required.
    """
    pass

class ThreadProxy(ProxyBase, ThreadDevice):
    """Proxy in a Thread. See Proxy for more."""
    pass

class ProcessProxy(ProxyBase, ProcessDevice):
    """Proxy in a Process. See Proxy for more."""
    pass


__all__ = [
    'Proxy',
    'ThreadProxy',
    'ProcessProxy',
]