summaryrefslogtreecommitdiff
path: root/zmq/backend/cython/_poll.pyx
blob: 5bed46b6075474a49cef18a7093dcf44625157ad (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""0MQ polling related functions and classes."""

#
#    Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley
#
#    This file is part of pyzmq.
#
#    pyzmq is free software; you can redistribute it and/or modify it under
#    the terms of the Lesser GNU General Public License as published by
#    the Free Software Foundation; either version 3 of the License, or
#    (at your option) any later version.
#
#    pyzmq is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    Lesser GNU General Public License for more details.
#
#    You should have received a copy of the Lesser GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------

from libc.stdlib cimport free, malloc

from libzmq cimport zmq_pollitem_t, ZMQ_VERSION_MAJOR
from libzmq cimport zmq_poll as zmq_poll_c
from socket cimport Socket

import sys

from zmq.backend.cython.checkrc cimport _check_rc

#-----------------------------------------------------------------------------
# Polling related methods
#-----------------------------------------------------------------------------

# version-independent typecheck for int/long
if sys.version_info[0] >= 3:
    int_t = int
else:
    int_t = (int,long)


def zmq_poll(sockets, long timeout=-1):
    """zmq_poll(sockets, timeout=-1)

    Poll a set of 0MQ sockets, native file descs. or sockets.

    Parameters
    ----------
    sockets : list of tuples of (socket, flags)
        Each element of this list is a two-tuple containing a socket
        and a flags. The socket may be a 0MQ socket or any object with
        a ``fileno()`` method. The flags can be zmq.POLLIN (for detecting
        for incoming messages), zmq.POLLOUT (for detecting that send is OK)
        or zmq.POLLIN|zmq.POLLOUT for detecting both.
    timeout : int
        The number of milliseconds to poll for. Negative means no timeout.
    """
    cdef int rc, i
    cdef zmq_pollitem_t *pollitems = NULL
    cdef int nsockets = <int>len(sockets)
    cdef Socket current_socket
    
    if nsockets == 0:
        return []
    
    pollitems = <zmq_pollitem_t *>malloc(nsockets*sizeof(zmq_pollitem_t))
    if pollitems == NULL:
        raise MemoryError("Could not allocate poll items")
        
    if ZMQ_VERSION_MAJOR < 3:
        # timeout is us in 2.x, ms in 3.x
        # expected input is ms (matches 3.x)
        timeout = 1000*timeout
    
    for i in range(nsockets):
        s, events = sockets[i]
        if isinstance(s, Socket):
            pollitems[i].socket = (<Socket>s).handle
            pollitems[i].events = events
            pollitems[i].revents = 0
        elif isinstance(s, int_t):
            pollitems[i].socket = NULL
            pollitems[i].fd = s
            pollitems[i].events = events
            pollitems[i].revents = 0
        elif hasattr(s, 'fileno'):
            try:
                fileno = int(s.fileno())
            except:
                free(pollitems)
                raise ValueError('fileno() must return a valid integer fd')
            else:
                pollitems[i].socket = NULL
                pollitems[i].fd = fileno
                pollitems[i].events = events
                pollitems[i].revents = 0
        else:
            free(pollitems)
            raise TypeError(
                "Socket must be a 0MQ socket, an integer fd or have "
                "a fileno() method: %r" % s
            )
    

    with nogil:
        rc = zmq_poll_c(pollitems, nsockets, timeout)
    
    if rc < 0:
        free(pollitems)
        _check_rc(rc)
    
    results = []
    for i in range(nsockets):
        revents = pollitems[i].revents
        # for compatibility with select.poll:
        # - only return sockets with non-zero status
        # - return the fd for plain sockets
        if revents > 0:
            if pollitems[i].socket != NULL:
                s = sockets[i][0]
            else:
                s = pollitems[i].fd
            results.append((s, revents))

    free(pollitems)
    return results

#-----------------------------------------------------------------------------
# Symbols to export
#-----------------------------------------------------------------------------

__all__ = [ 'zmq_poll' ]