summaryrefslogtreecommitdiff
path: root/zmq/sugar/poll.py
diff options
context:
space:
mode:
Diffstat (limited to 'zmq/sugar/poll.py')
-rw-r--r--zmq/sugar/poll.py161
1 files changed, 161 insertions, 0 deletions
diff --git a/zmq/sugar/poll.py b/zmq/sugar/poll.py
new file mode 100644
index 0000000..c7b1d1b
--- /dev/null
+++ b/zmq/sugar/poll.py
@@ -0,0 +1,161 @@
+"""0MQ polling related functions and classes."""
+
+# Copyright (C) PyZMQ Developers
+# Distributed under the terms of the Modified BSD License.
+
+
+import zmq
+from zmq.backend import zmq_poll
+from .constants import POLLIN, POLLOUT, POLLERR
+
+#-----------------------------------------------------------------------------
+# Polling related methods
+#-----------------------------------------------------------------------------
+
+
+class Poller(object):
+ """A stateful poll interface that mirrors Python's built-in poll."""
+ sockets = None
+ _map = {}
+
+ def __init__(self):
+ self.sockets = []
+ self._map = {}
+
+ def __contains__(self, socket):
+ return socket in self._map
+
+ def register(self, socket, flags=POLLIN|POLLOUT):
+ """p.register(socket, flags=POLLIN|POLLOUT)
+
+ Register a 0MQ socket or native fd for I/O monitoring.
+
+ register(s,0) is equivalent to unregister(s).
+
+ Parameters
+ ----------
+ socket : zmq.Socket or native socket
+ A zmq.Socket or any Python object having a ``fileno()``
+ method that returns a valid file descriptor.
+ flags : int
+ The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
+ If `flags=0`, socket will be unregistered.
+ """
+ if flags:
+ if socket in self._map:
+ idx = self._map[socket]
+ self.sockets[idx] = (socket, flags)
+ else:
+ idx = len(self.sockets)
+ self.sockets.append((socket, flags))
+ self._map[socket] = idx
+ elif socket in self._map:
+ # uregister sockets registered with no events
+ self.unregister(socket)
+ else:
+ # ignore new sockets with no events
+ pass
+
+ def modify(self, socket, flags=POLLIN|POLLOUT):
+ """Modify the flags for an already registered 0MQ socket or native fd."""
+ self.register(socket, flags)
+
+ def unregister(self, socket):
+ """Remove a 0MQ socket or native fd for I/O monitoring.
+
+ Parameters
+ ----------
+ socket : Socket
+ The socket instance to stop polling.
+ """
+ idx = self._map.pop(socket)
+ self.sockets.pop(idx)
+ # shift indices after deletion
+ for socket, flags in self.sockets[idx:]:
+ self._map[socket] -= 1
+
+ def poll(self, timeout=None):
+ """Poll the registered 0MQ or native fds for I/O.
+
+ Parameters
+ ----------
+ timeout : float, int
+ The timeout in milliseconds. If None, no `timeout` (infinite). This
+ is in milliseconds to be compatible with ``select.poll()``. The
+ underlying zmq_poll uses microseconds and we convert to that in
+ this function.
+
+ Returns
+ -------
+ events : list of tuples
+ The list of events that are ready to be processed.
+ This is a list of tuples of the form ``(socket, event)``, where the 0MQ Socket
+ or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second.
+ It is common to call ``events = dict(poller.poll())``,
+ which turns the list of tuples into a mapping of ``socket : event``.
+ """
+ if timeout is None or timeout < 0:
+ timeout = -1
+ elif isinstance(timeout, float):
+ timeout = int(timeout)
+ return zmq_poll(self.sockets, timeout=timeout)
+
+
+def select(rlist, wlist, xlist, timeout=None):
+ """select(rlist, wlist, xlist, timeout=None) -> (rlist, wlist, xlist)
+
+ Return the result of poll as a lists of sockets ready for r/w/exception.
+
+ This has the same interface as Python's built-in ``select.select()`` function.
+
+ Parameters
+ ----------
+ timeout : float, int, optional
+ The timeout in seconds. If None, no timeout (infinite). This is in seconds to be
+ compatible with ``select.select()``. The underlying zmq_poll uses microseconds
+ and we convert to that in this function.
+ rlist : list of sockets/FDs
+ sockets/FDs to be polled for read events
+ wlist : list of sockets/FDs
+ sockets/FDs to be polled for write events
+ xlist : list of sockets/FDs
+ sockets/FDs to be polled for error events
+
+ Returns
+ -------
+ (rlist, wlist, xlist) : tuple of lists of sockets (length 3)
+ Lists correspond to sockets available for read/write/error events respectively.
+ """
+ if timeout is None:
+ timeout = -1
+ # Convert from sec -> us for zmq_poll.
+ # zmq_poll accepts 3.x style timeout in ms
+ timeout = int(timeout*1000.0)
+ if timeout < 0:
+ timeout = -1
+ sockets = []
+ for s in set(rlist + wlist + xlist):
+ flags = 0
+ if s in rlist:
+ flags |= POLLIN
+ if s in wlist:
+ flags |= POLLOUT
+ if s in xlist:
+ flags |= POLLERR
+ sockets.append((s, flags))
+ return_sockets = zmq_poll(sockets, timeout)
+ rlist, wlist, xlist = [], [], []
+ for s, flags in return_sockets:
+ if flags & POLLIN:
+ rlist.append(s)
+ if flags & POLLOUT:
+ wlist.append(s)
+ if flags & POLLERR:
+ xlist.append(s)
+ return rlist, wlist, xlist
+
+#-----------------------------------------------------------------------------
+# Symbols to export
+#-----------------------------------------------------------------------------
+
+__all__ = [ 'Poller', 'select' ]