summaryrefslogtreecommitdiff
path: root/zmq/eventloop/ioloop.py
blob: 35f4c418f12c7ebc9e05b5ee9faf473b3f627c70 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# coding: utf-8
"""tornado IOLoop API with zmq compatibility

If you have tornado ≥ 3.0, this is a subclass of tornado's IOLoop,
otherwise we ship a minimal subset of tornado in zmq.eventloop.minitornado.

The minimal shipped version of tornado's IOLoop does not include
support for concurrent futures - this will only be available if you
have tornado ≥ 3.0.
"""

# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.

from __future__ import absolute_import, division, with_statement

import os
import time
import warnings

from zmq import (
    Poller,
    POLLIN, POLLOUT, POLLERR,
    ZMQError, ETERM,
)

try:
    import tornado
    tornado_version = tornado.version_info
except (ImportError, AttributeError):
    tornado_version = ()

try:
    # tornado ≥ 3
    from tornado.ioloop import PollIOLoop, PeriodicCallback
    from tornado.log import gen_log
except ImportError:
    from .minitornado.ioloop import PollIOLoop, PeriodicCallback
    from .minitornado.log import gen_log


class DelayedCallback(PeriodicCallback):
    """Schedules the given callback to be called once.

    The callback is called once, after callback_time milliseconds.

    `start` must be called after the DelayedCallback is created.
    
    The timeout is calculated from when `start` is called.
    """
    def __init__(self, callback, callback_time, io_loop=None):
        # PeriodicCallback require callback_time to be positive
        warnings.warn("""DelayedCallback is deprecated.
        Use loop.add_timeout instead.""", DeprecationWarning)
        callback_time = max(callback_time, 1e-3)
        super(DelayedCallback, self).__init__(callback, callback_time, io_loop)
    
    def start(self):
        """Starts the timer."""
        self._running = True
        self._firstrun = True
        self._next_timeout = time.time() + self.callback_time / 1000.0
        self.io_loop.add_timeout(self._next_timeout, self._run)
    
    def _run(self):
        if not self._running: return
        self._running = False
        try:
            self.callback()
        except Exception:
            gen_log.error("Error in delayed callback", exc_info=True)


class ZMQPoller(object):
    """A poller that can be used in the tornado IOLoop.
    
    This simply wraps a regular zmq.Poller, scaling the timeout
    by 1000, so that it is in seconds rather than milliseconds.
    """
    
    def __init__(self):
        self._poller = Poller()
    
    @staticmethod
    def _map_events(events):
        """translate IOLoop.READ/WRITE/ERROR event masks into zmq.POLLIN/OUT/ERR"""
        z_events = 0
        if events & IOLoop.READ:
            z_events |= POLLIN
        if events & IOLoop.WRITE:
            z_events |= POLLOUT
        if events & IOLoop.ERROR:
            z_events |= POLLERR
        return z_events
    
    @staticmethod
    def _remap_events(z_events):
        """translate zmq.POLLIN/OUT/ERR event masks into IOLoop.READ/WRITE/ERROR"""
        events = 0
        if z_events & POLLIN:
            events |= IOLoop.READ
        if z_events & POLLOUT:
            events |= IOLoop.WRITE
        if z_events & POLLERR:
            events |= IOLoop.ERROR
        return events
    
    def register(self, fd, events):
        return self._poller.register(fd, self._map_events(events))
    
    def modify(self, fd, events):
        return self._poller.modify(fd, self._map_events(events))
    
    def unregister(self, fd):
        return self._poller.unregister(fd)
    
    def poll(self, timeout):
        """poll in seconds rather than milliseconds.
        
        Event masks will be IOLoop.READ/WRITE/ERROR
        """
        z_events = self._poller.poll(1000*timeout)
        return [ (fd,self._remap_events(evt)) for (fd,evt) in z_events ]
    
    def close(self):
        pass


class ZMQIOLoop(PollIOLoop):
    """ZMQ subclass of tornado's IOLoop"""
    def initialize(self, impl=None, **kwargs):
        impl = ZMQPoller() if impl is None else impl
        super(ZMQIOLoop, self).initialize(impl=impl, **kwargs)
    
    @staticmethod
    def instance():
        """Returns a global `IOLoop` instance.
        
        Most applications have a single, global `IOLoop` running on the
        main thread.  Use this method to get this instance from
        another thread.  To get the current thread's `IOLoop`, use `current()`.
        """
        # install ZMQIOLoop as the active IOLoop implementation
        # when using tornado 3
        if tornado_version >= (3,):
            PollIOLoop.configure(ZMQIOLoop)
        return PollIOLoop.instance()
    
    def start(self):
        try:
            super(ZMQIOLoop, self).start()
        except ZMQError as e:
            if e.errno == ETERM:
                # quietly return on ETERM
                pass
            else:
                raise e


if tornado_version >= (3,0) and tornado_version < (3,1):
    def backport_close(self, all_fds=False):
        """backport IOLoop.close to 3.0 from 3.1 (supports fd.close() method)"""
        from zmq.eventloop.minitornado.ioloop import PollIOLoop as mini_loop
        return mini_loop.close.__get__(self)(all_fds)
    ZMQIOLoop.close = backport_close


# public API name
IOLoop = ZMQIOLoop


def install():
    """set the tornado IOLoop instance with the pyzmq IOLoop.
    
    After calling this function, tornado's IOLoop.instance() and pyzmq's
    IOLoop.instance() will return the same object.
    
    An assertion error will be raised if tornado's IOLoop has been initialized
    prior to calling this function.
    """
    from tornado import ioloop
    # check if tornado's IOLoop is already initialized to something other
    # than the pyzmq IOLoop instance:
    assert (not ioloop.IOLoop.initialized()) or \
        ioloop.IOLoop.instance() is IOLoop.instance(), "tornado IOLoop already initialized"
    
    if tornado_version >= (3,):
        # tornado 3 has an official API for registering new defaults, yay!
        ioloop.IOLoop.configure(ZMQIOLoop)
    else:
        # we have to set the global instance explicitly
        ioloop.IOLoop._instance = IOLoop.instance()