From cce638a8adf4e045ca5505afea4bda57753c31dd Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Mon, 11 Aug 2014 16:33:29 -0400 Subject: initial import of debian package --- zmq/backend/cython/__init__.py | 23 ++ zmq/backend/cython/_device.pyx | 89 +++++ zmq/backend/cython/_poll.pyx | 137 +++++++ zmq/backend/cython/_version.pyx | 43 +++ zmq/backend/cython/checkrc.pxd | 23 ++ zmq/backend/cython/constant_enums.pxi | 137 +++++++ zmq/backend/cython/constants.pxi | 280 ++++++++++++++ zmq/backend/cython/constants.pyx | 32 ++ zmq/backend/cython/context.pxd | 41 +++ zmq/backend/cython/context.pyx | 243 ++++++++++++ zmq/backend/cython/error.pyx | 56 +++ zmq/backend/cython/libzmq.pxd | 107 ++++++ zmq/backend/cython/message.pxd | 63 ++++ zmq/backend/cython/message.pyx | 356 ++++++++++++++++++ zmq/backend/cython/rebuffer.pyx | 104 ++++++ zmq/backend/cython/socket.pxd | 47 +++ zmq/backend/cython/socket.pyx | 669 ++++++++++++++++++++++++++++++++++ zmq/backend/cython/utils.pxd | 29 ++ zmq/backend/cython/utils.pyx | 111 ++++++ 19 files changed, 2590 insertions(+) create mode 100644 zmq/backend/cython/__init__.py create mode 100644 zmq/backend/cython/_device.pyx create mode 100644 zmq/backend/cython/_poll.pyx create mode 100644 zmq/backend/cython/_version.pyx create mode 100644 zmq/backend/cython/checkrc.pxd create mode 100644 zmq/backend/cython/constant_enums.pxi create mode 100644 zmq/backend/cython/constants.pxi create mode 100644 zmq/backend/cython/constants.pyx create mode 100644 zmq/backend/cython/context.pxd create mode 100644 zmq/backend/cython/context.pyx create mode 100644 zmq/backend/cython/error.pyx create mode 100644 zmq/backend/cython/libzmq.pxd create mode 100644 zmq/backend/cython/message.pxd create mode 100644 zmq/backend/cython/message.pyx create mode 100644 zmq/backend/cython/rebuffer.pyx create mode 100644 zmq/backend/cython/socket.pxd create mode 100644 zmq/backend/cython/socket.pyx create mode 100644 zmq/backend/cython/utils.pxd create mode 100644 zmq/backend/cython/utils.pyx (limited to 'zmq/backend/cython') diff --git a/zmq/backend/cython/__init__.py b/zmq/backend/cython/__init__.py new file mode 100644 index 0000000..e535818 --- /dev/null +++ b/zmq/backend/cython/__init__.py @@ -0,0 +1,23 @@ +"""Python bindings for core 0MQ objects.""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Lesser GNU Public License (LGPL). + +from . import (constants, error, message, context, + socket, utils, _poll, _version, _device ) + +__all__ = [] +for submod in (constants, error, message, context, + socket, utils, _poll, _version, _device): + __all__.extend(submod.__all__) + +from .constants import * +from .error import * +from .message import * +from .context import * +from .socket import * +from ._poll import * +from .utils import * +from ._device import * +from ._version import * + diff --git a/zmq/backend/cython/_device.pyx b/zmq/backend/cython/_device.pyx new file mode 100644 index 0000000..eea0a00 --- /dev/null +++ b/zmq/backend/cython/_device.pyx @@ -0,0 +1,89 @@ +"""Python binding for 0MQ device function.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from libzmq cimport zmq_device, zmq_proxy, ZMQ_VERSION_MAJOR +from zmq.backend.cython.socket cimport Socket as cSocket +from zmq.backend.cython.checkrc cimport _check_rc + +#----------------------------------------------------------------------------- +# Basic device API +#----------------------------------------------------------------------------- + +def device(int device_type, cSocket frontend, cSocket backend=None): + """device(device_type, frontend, backend) + + Start a zeromq device. + + .. deprecated:: libzmq-3.2 + Use zmq.proxy + + Parameters + ---------- + device_type : (QUEUE, FORWARDER, STREAMER) + The type of device to start. + frontend : Socket + The Socket instance for the incoming traffic. + backend : Socket + The Socket instance for the outbound traffic. + """ + if ZMQ_VERSION_MAJOR >= 3: + return proxy(frontend, backend) + + cdef int rc = 0 + with nogil: + rc = zmq_device(device_type, frontend.handle, backend.handle) + _check_rc(rc) + return rc + +def proxy(cSocket frontend, cSocket backend, cSocket capture=None): + """proxy(frontend, backend, capture) + + Start a zeromq proxy (replacement for device). + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + Parameters + ---------- + frontend : Socket + The Socket instance for the incoming traffic. + backend : Socket + The Socket instance for the outbound traffic. + capture : Socket (optional) + The Socket instance for capturing traffic. + """ + cdef int rc = 0 + cdef void* capture_handle + if isinstance(capture, cSocket): + capture_handle = capture.handle + else: + capture_handle = NULL + with nogil: + rc = zmq_proxy(frontend.handle, backend.handle, capture_handle) + _check_rc(rc) + return rc + +__all__ = ['device', 'proxy'] + diff --git a/zmq/backend/cython/_poll.pyx b/zmq/backend/cython/_poll.pyx new file mode 100644 index 0000000..5bed46b --- /dev/null +++ b/zmq/backend/cython/_poll.pyx @@ -0,0 +1,137 @@ +"""0MQ polling related functions and classes.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from libc.stdlib cimport free, malloc + +from libzmq cimport zmq_pollitem_t, ZMQ_VERSION_MAJOR +from libzmq cimport zmq_poll as zmq_poll_c +from socket cimport Socket + +import sys + +from zmq.backend.cython.checkrc cimport _check_rc + +#----------------------------------------------------------------------------- +# Polling related methods +#----------------------------------------------------------------------------- + +# version-independent typecheck for int/long +if sys.version_info[0] >= 3: + int_t = int +else: + int_t = (int,long) + + +def zmq_poll(sockets, long timeout=-1): + """zmq_poll(sockets, timeout=-1) + + Poll a set of 0MQ sockets, native file descs. or sockets. + + Parameters + ---------- + sockets : list of tuples of (socket, flags) + Each element of this list is a two-tuple containing a socket + and a flags. The socket may be a 0MQ socket or any object with + a ``fileno()`` method. The flags can be zmq.POLLIN (for detecting + for incoming messages), zmq.POLLOUT (for detecting that send is OK) + or zmq.POLLIN|zmq.POLLOUT for detecting both. + timeout : int + The number of milliseconds to poll for. Negative means no timeout. + """ + cdef int rc, i + cdef zmq_pollitem_t *pollitems = NULL + cdef int nsockets = len(sockets) + cdef Socket current_socket + + if nsockets == 0: + return [] + + pollitems = malloc(nsockets*sizeof(zmq_pollitem_t)) + if pollitems == NULL: + raise MemoryError("Could not allocate poll items") + + if ZMQ_VERSION_MAJOR < 3: + # timeout is us in 2.x, ms in 3.x + # expected input is ms (matches 3.x) + timeout = 1000*timeout + + for i in range(nsockets): + s, events = sockets[i] + if isinstance(s, Socket): + pollitems[i].socket = (s).handle + pollitems[i].events = events + pollitems[i].revents = 0 + elif isinstance(s, int_t): + pollitems[i].socket = NULL + pollitems[i].fd = s + pollitems[i].events = events + pollitems[i].revents = 0 + elif hasattr(s, 'fileno'): + try: + fileno = int(s.fileno()) + except: + free(pollitems) + raise ValueError('fileno() must return a valid integer fd') + else: + pollitems[i].socket = NULL + pollitems[i].fd = fileno + pollitems[i].events = events + pollitems[i].revents = 0 + else: + free(pollitems) + raise TypeError( + "Socket must be a 0MQ socket, an integer fd or have " + "a fileno() method: %r" % s + ) + + + with nogil: + rc = zmq_poll_c(pollitems, nsockets, timeout) + + if rc < 0: + free(pollitems) + _check_rc(rc) + + results = [] + for i in range(nsockets): + revents = pollitems[i].revents + # for compatibility with select.poll: + # - only return sockets with non-zero status + # - return the fd for plain sockets + if revents > 0: + if pollitems[i].socket != NULL: + s = sockets[i][0] + else: + s = pollitems[i].fd + results.append((s, revents)) + + free(pollitems) + return results + +#----------------------------------------------------------------------------- +# Symbols to export +#----------------------------------------------------------------------------- + +__all__ = [ 'zmq_poll' ] diff --git a/zmq/backend/cython/_version.pyx b/zmq/backend/cython/_version.pyx new file mode 100644 index 0000000..02cf6fc --- /dev/null +++ b/zmq/backend/cython/_version.pyx @@ -0,0 +1,43 @@ +"""PyZMQ and 0MQ version functions.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from libzmq cimport _zmq_version + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +def zmq_version_info(): + """zmq_version_info() + + Return the version of ZeroMQ itself as a 3-tuple of ints. + """ + cdef int major, minor, patch + _zmq_version(&major, &minor, &patch) + return (major, minor, patch) + + +__all__ = ['zmq_version_info'] + diff --git a/zmq/backend/cython/checkrc.pxd b/zmq/backend/cython/checkrc.pxd new file mode 100644 index 0000000..3bf69fc --- /dev/null +++ b/zmq/backend/cython/checkrc.pxd @@ -0,0 +1,23 @@ +from libc.errno cimport EINTR, EAGAIN +from cpython cimport PyErr_CheckSignals +from libzmq cimport zmq_errno, ZMQ_ETERM + +cdef inline int _check_rc(int rc) except -1: + """internal utility for checking zmq return condition + + and raising the appropriate Exception class + """ + cdef int errno = zmq_errno() + PyErr_CheckSignals() + if rc < 0: + if errno == EAGAIN: + from zmq.error import Again + raise Again(errno) + elif errno == ZMQ_ETERM: + from zmq.error import ContextTerminated + raise ContextTerminated(errno) + else: + from zmq.error import ZMQError + raise ZMQError(errno) + # return -1 + return 0 diff --git a/zmq/backend/cython/constant_enums.pxi b/zmq/backend/cython/constant_enums.pxi new file mode 100644 index 0000000..a34d9a4 --- /dev/null +++ b/zmq/backend/cython/constant_enums.pxi @@ -0,0 +1,137 @@ +cdef extern from "zmq.h" nogil: + + enum: ZMQ_VERSION + enum: ZMQ_VERSION_MAJOR + enum: ZMQ_VERSION_MINOR + enum: ZMQ_VERSION_PATCH + enum: ZMQ_NOBLOCK + enum: ZMQ_DONTWAIT + enum: ZMQ_POLLIN + enum: ZMQ_POLLOUT + enum: ZMQ_POLLERR + enum: ZMQ_SNDMORE + enum: ZMQ_STREAMER + enum: ZMQ_FORWARDER + enum: ZMQ_QUEUE + enum: ZMQ_IO_THREADS_DFLT + enum: ZMQ_MAX_SOCKETS_DFLT + enum: ZMQ_PAIR + enum: ZMQ_PUB + enum: ZMQ_SUB + enum: ZMQ_REQ + enum: ZMQ_REP + enum: ZMQ_DEALER + enum: ZMQ_ROUTER + enum: ZMQ_PULL + enum: ZMQ_PUSH + enum: ZMQ_XPUB + enum: ZMQ_XSUB + enum: ZMQ_UPSTREAM + enum: ZMQ_DOWNSTREAM + enum: ZMQ_STREAM + enum: ZMQ_EVENT_CONNECTED + enum: ZMQ_EVENT_CONNECT_DELAYED + enum: ZMQ_EVENT_CONNECT_RETRIED + enum: ZMQ_EVENT_LISTENING + enum: ZMQ_EVENT_BIND_FAILED + enum: ZMQ_EVENT_ACCEPTED + enum: ZMQ_EVENT_ACCEPT_FAILED + enum: ZMQ_EVENT_CLOSED + enum: ZMQ_EVENT_CLOSE_FAILED + enum: ZMQ_EVENT_DISCONNECTED + enum: ZMQ_EVENT_ALL + enum: ZMQ_EVENT_MONITOR_STOPPED + enum: ZMQ_NULL + enum: ZMQ_PLAIN + enum: ZMQ_CURVE + enum: ZMQ_EAGAIN "EAGAIN" + enum: ZMQ_EINVAL "EINVAL" + enum: ZMQ_EFAULT "EFAULT" + enum: ZMQ_ENOMEM "ENOMEM" + enum: ZMQ_ENODEV "ENODEV" + enum: ZMQ_EMSGSIZE "EMSGSIZE" + enum: ZMQ_EAFNOSUPPORT "EAFNOSUPPORT" + enum: ZMQ_ENETUNREACH "ENETUNREACH" + enum: ZMQ_ECONNABORTED "ECONNABORTED" + enum: ZMQ_ECONNRESET "ECONNRESET" + enum: ZMQ_ENOTCONN "ENOTCONN" + enum: ZMQ_ETIMEDOUT "ETIMEDOUT" + enum: ZMQ_EHOSTUNREACH "EHOSTUNREACH" + enum: ZMQ_ENETRESET "ENETRESET" + enum: ZMQ_HAUSNUMERO + enum: ZMQ_ENOTSUP "ENOTSUP" + enum: ZMQ_EPROTONOSUPPORT "EPROTONOSUPPORT" + enum: ZMQ_ENOBUFS "ENOBUFS" + enum: ZMQ_ENETDOWN "ENETDOWN" + enum: ZMQ_EADDRINUSE "EADDRINUSE" + enum: ZMQ_EADDRNOTAVAIL "EADDRNOTAVAIL" + enum: ZMQ_ECONNREFUSED "ECONNREFUSED" + enum: ZMQ_EINPROGRESS "EINPROGRESS" + enum: ZMQ_ENOTSOCK "ENOTSOCK" + enum: ZMQ_EFSM "EFSM" + enum: ZMQ_ENOCOMPATPROTO "ENOCOMPATPROTO" + enum: ZMQ_ETERM "ETERM" + enum: ZMQ_EMTHREAD "EMTHREAD" + enum: ZMQ_IO_THREADS + enum: ZMQ_MAX_SOCKETS + enum: ZMQ_MORE + enum: ZMQ_IDENTITY + enum: ZMQ_SUBSCRIBE + enum: ZMQ_UNSUBSCRIBE + enum: ZMQ_LAST_ENDPOINT + enum: ZMQ_TCP_ACCEPT_FILTER + enum: ZMQ_PLAIN_USERNAME + enum: ZMQ_PLAIN_PASSWORD + enum: ZMQ_CURVE_PUBLICKEY + enum: ZMQ_CURVE_SECRETKEY + enum: ZMQ_CURVE_SERVERKEY + enum: ZMQ_ZAP_DOMAIN + enum: ZMQ_CONNECT_RID + enum: ZMQ_RECONNECT_IVL_MAX + enum: ZMQ_SNDTIMEO + enum: ZMQ_RCVTIMEO + enum: ZMQ_SNDHWM + enum: ZMQ_RCVHWM + enum: ZMQ_MULTICAST_HOPS + enum: ZMQ_IPV4ONLY + enum: ZMQ_ROUTER_BEHAVIOR + enum: ZMQ_TCP_KEEPALIVE + enum: ZMQ_TCP_KEEPALIVE_CNT + enum: ZMQ_TCP_KEEPALIVE_IDLE + enum: ZMQ_TCP_KEEPALIVE_INTVL + enum: ZMQ_DELAY_ATTACH_ON_CONNECT + enum: ZMQ_XPUB_VERBOSE + enum: ZMQ_FD + enum: ZMQ_EVENTS + enum: ZMQ_TYPE + enum: ZMQ_LINGER + enum: ZMQ_RECONNECT_IVL + enum: ZMQ_BACKLOG + enum: ZMQ_ROUTER_MANDATORY + enum: ZMQ_FAIL_UNROUTABLE + enum: ZMQ_ROUTER_RAW + enum: ZMQ_IMMEDIATE + enum: ZMQ_IPV6 + enum: ZMQ_MECHANISM + enum: ZMQ_PLAIN_SERVER + enum: ZMQ_CURVE_SERVER + enum: ZMQ_PROBE_ROUTER + enum: ZMQ_REQ_RELAXED + enum: ZMQ_REQ_CORRELATE + enum: ZMQ_CONFLATE + enum: ZMQ_ROUTER_HANDOVER + enum: ZMQ_TOS + enum: ZMQ_IPC_FILTER_PID + enum: ZMQ_IPC_FILTER_UID + enum: ZMQ_IPC_FILTER_GID + enum: ZMQ_AFFINITY + enum: ZMQ_MAXMSGSIZE + enum: ZMQ_HWM + enum: ZMQ_SWAP + enum: ZMQ_MCAST_LOOP + enum: ZMQ_RECOVERY_IVL_MSEC + enum: ZMQ_RATE + enum: ZMQ_RECOVERY_IVL + enum: ZMQ_SNDBUF + enum: ZMQ_RCVBUF + enum: ZMQ_RCVMORE diff --git a/zmq/backend/cython/constants.pxi b/zmq/backend/cython/constants.pxi new file mode 100644 index 0000000..983cfd7 --- /dev/null +++ b/zmq/backend/cython/constants.pxi @@ -0,0 +1,280 @@ +#----------------------------------------------------------------------------- +# Python module level constants +#----------------------------------------------------------------------------- + +VERSION = ZMQ_VERSION +VERSION_MAJOR = ZMQ_VERSION_MAJOR +VERSION_MINOR = ZMQ_VERSION_MINOR +VERSION_PATCH = ZMQ_VERSION_PATCH +NOBLOCK = ZMQ_NOBLOCK +DONTWAIT = ZMQ_DONTWAIT +POLLIN = ZMQ_POLLIN +POLLOUT = ZMQ_POLLOUT +POLLERR = ZMQ_POLLERR +SNDMORE = ZMQ_SNDMORE +STREAMER = ZMQ_STREAMER +FORWARDER = ZMQ_FORWARDER +QUEUE = ZMQ_QUEUE +IO_THREADS_DFLT = ZMQ_IO_THREADS_DFLT +MAX_SOCKETS_DFLT = ZMQ_MAX_SOCKETS_DFLT +PAIR = ZMQ_PAIR +PUB = ZMQ_PUB +SUB = ZMQ_SUB +REQ = ZMQ_REQ +REP = ZMQ_REP +DEALER = ZMQ_DEALER +ROUTER = ZMQ_ROUTER +PULL = ZMQ_PULL +PUSH = ZMQ_PUSH +XPUB = ZMQ_XPUB +XSUB = ZMQ_XSUB +UPSTREAM = ZMQ_UPSTREAM +DOWNSTREAM = ZMQ_DOWNSTREAM +STREAM = ZMQ_STREAM +EVENT_CONNECTED = ZMQ_EVENT_CONNECTED +EVENT_CONNECT_DELAYED = ZMQ_EVENT_CONNECT_DELAYED +EVENT_CONNECT_RETRIED = ZMQ_EVENT_CONNECT_RETRIED +EVENT_LISTENING = ZMQ_EVENT_LISTENING +EVENT_BIND_FAILED = ZMQ_EVENT_BIND_FAILED +EVENT_ACCEPTED = ZMQ_EVENT_ACCEPTED +EVENT_ACCEPT_FAILED = ZMQ_EVENT_ACCEPT_FAILED +EVENT_CLOSED = ZMQ_EVENT_CLOSED +EVENT_CLOSE_FAILED = ZMQ_EVENT_CLOSE_FAILED +EVENT_DISCONNECTED = ZMQ_EVENT_DISCONNECTED +EVENT_ALL = ZMQ_EVENT_ALL +EVENT_MONITOR_STOPPED = ZMQ_EVENT_MONITOR_STOPPED +globals()['NULL'] = ZMQ_NULL +PLAIN = ZMQ_PLAIN +CURVE = ZMQ_CURVE +EAGAIN = ZMQ_EAGAIN +EINVAL = ZMQ_EINVAL +EFAULT = ZMQ_EFAULT +ENOMEM = ZMQ_ENOMEM +ENODEV = ZMQ_ENODEV +EMSGSIZE = ZMQ_EMSGSIZE +EAFNOSUPPORT = ZMQ_EAFNOSUPPORT +ENETUNREACH = ZMQ_ENETUNREACH +ECONNABORTED = ZMQ_ECONNABORTED +ECONNRESET = ZMQ_ECONNRESET +ENOTCONN = ZMQ_ENOTCONN +ETIMEDOUT = ZMQ_ETIMEDOUT +EHOSTUNREACH = ZMQ_EHOSTUNREACH +ENETRESET = ZMQ_ENETRESET +HAUSNUMERO = ZMQ_HAUSNUMERO +ENOTSUP = ZMQ_ENOTSUP +EPROTONOSUPPORT = ZMQ_EPROTONOSUPPORT +ENOBUFS = ZMQ_ENOBUFS +ENETDOWN = ZMQ_ENETDOWN +EADDRINUSE = ZMQ_EADDRINUSE +EADDRNOTAVAIL = ZMQ_EADDRNOTAVAIL +ECONNREFUSED = ZMQ_ECONNREFUSED +EINPROGRESS = ZMQ_EINPROGRESS +ENOTSOCK = ZMQ_ENOTSOCK +EFSM = ZMQ_EFSM +ENOCOMPATPROTO = ZMQ_ENOCOMPATPROTO +ETERM = ZMQ_ETERM +EMTHREAD = ZMQ_EMTHREAD +IO_THREADS = ZMQ_IO_THREADS +MAX_SOCKETS = ZMQ_MAX_SOCKETS +MORE = ZMQ_MORE +IDENTITY = ZMQ_IDENTITY +SUBSCRIBE = ZMQ_SUBSCRIBE +UNSUBSCRIBE = ZMQ_UNSUBSCRIBE +LAST_ENDPOINT = ZMQ_LAST_ENDPOINT +TCP_ACCEPT_FILTER = ZMQ_TCP_ACCEPT_FILTER +PLAIN_USERNAME = ZMQ_PLAIN_USERNAME +PLAIN_PASSWORD = ZMQ_PLAIN_PASSWORD +CURVE_PUBLICKEY = ZMQ_CURVE_PUBLICKEY +CURVE_SECRETKEY = ZMQ_CURVE_SECRETKEY +CURVE_SERVERKEY = ZMQ_CURVE_SERVERKEY +ZAP_DOMAIN = ZMQ_ZAP_DOMAIN +CONNECT_RID = ZMQ_CONNECT_RID +RECONNECT_IVL_MAX = ZMQ_RECONNECT_IVL_MAX +SNDTIMEO = ZMQ_SNDTIMEO +RCVTIMEO = ZMQ_RCVTIMEO +SNDHWM = ZMQ_SNDHWM +RCVHWM = ZMQ_RCVHWM +MULTICAST_HOPS = ZMQ_MULTICAST_HOPS +IPV4ONLY = ZMQ_IPV4ONLY +ROUTER_BEHAVIOR = ZMQ_ROUTER_BEHAVIOR +TCP_KEEPALIVE = ZMQ_TCP_KEEPALIVE +TCP_KEEPALIVE_CNT = ZMQ_TCP_KEEPALIVE_CNT +TCP_KEEPALIVE_IDLE = ZMQ_TCP_KEEPALIVE_IDLE +TCP_KEEPALIVE_INTVL = ZMQ_TCP_KEEPALIVE_INTVL +DELAY_ATTACH_ON_CONNECT = ZMQ_DELAY_ATTACH_ON_CONNECT +XPUB_VERBOSE = ZMQ_XPUB_VERBOSE +FD = ZMQ_FD +EVENTS = ZMQ_EVENTS +TYPE = ZMQ_TYPE +LINGER = ZMQ_LINGER +RECONNECT_IVL = ZMQ_RECONNECT_IVL +BACKLOG = ZMQ_BACKLOG +ROUTER_MANDATORY = ZMQ_ROUTER_MANDATORY +FAIL_UNROUTABLE = ZMQ_FAIL_UNROUTABLE +ROUTER_RAW = ZMQ_ROUTER_RAW +IMMEDIATE = ZMQ_IMMEDIATE +IPV6 = ZMQ_IPV6 +MECHANISM = ZMQ_MECHANISM +PLAIN_SERVER = ZMQ_PLAIN_SERVER +CURVE_SERVER = ZMQ_CURVE_SERVER +PROBE_ROUTER = ZMQ_PROBE_ROUTER +REQ_RELAXED = ZMQ_REQ_RELAXED +REQ_CORRELATE = ZMQ_REQ_CORRELATE +CONFLATE = ZMQ_CONFLATE +ROUTER_HANDOVER = ZMQ_ROUTER_HANDOVER +TOS = ZMQ_TOS +IPC_FILTER_PID = ZMQ_IPC_FILTER_PID +IPC_FILTER_UID = ZMQ_IPC_FILTER_UID +IPC_FILTER_GID = ZMQ_IPC_FILTER_GID +AFFINITY = ZMQ_AFFINITY +MAXMSGSIZE = ZMQ_MAXMSGSIZE +HWM = ZMQ_HWM +SWAP = ZMQ_SWAP +MCAST_LOOP = ZMQ_MCAST_LOOP +RECOVERY_IVL_MSEC = ZMQ_RECOVERY_IVL_MSEC +RATE = ZMQ_RATE +RECOVERY_IVL = ZMQ_RECOVERY_IVL +SNDBUF = ZMQ_SNDBUF +RCVBUF = ZMQ_RCVBUF +RCVMORE = ZMQ_RCVMORE + +#----------------------------------------------------------------------------- +# Symbols to export +#----------------------------------------------------------------------------- +__all__ = [ + "VERSION", + "VERSION_MAJOR", + "VERSION_MINOR", + "VERSION_PATCH", + "NOBLOCK", + "DONTWAIT", + "POLLIN", + "POLLOUT", + "POLLERR", + "SNDMORE", + "STREAMER", + "FORWARDER", + "QUEUE", + "IO_THREADS_DFLT", + "MAX_SOCKETS_DFLT", + "PAIR", + "PUB", + "SUB", + "REQ", + "REP", + "DEALER", + "ROUTER", + "PULL", + "PUSH", + "XPUB", + "XSUB", + "UPSTREAM", + "DOWNSTREAM", + "STREAM", + "EVENT_CONNECTED", + "EVENT_CONNECT_DELAYED", + "EVENT_CONNECT_RETRIED", + "EVENT_LISTENING", + "EVENT_BIND_FAILED", + "EVENT_ACCEPTED", + "EVENT_ACCEPT_FAILED", + "EVENT_CLOSED", + "EVENT_CLOSE_FAILED", + "EVENT_DISCONNECTED", + "EVENT_ALL", + "EVENT_MONITOR_STOPPED", + "NULL", + "PLAIN", + "CURVE", + "EAGAIN", + "EINVAL", + "EFAULT", + "ENOMEM", + "ENODEV", + "EMSGSIZE", + "EAFNOSUPPORT", + "ENETUNREACH", + "ECONNABORTED", + "ECONNRESET", + "ENOTCONN", + "ETIMEDOUT", + "EHOSTUNREACH", + "ENETRESET", + "HAUSNUMERO", + "ENOTSUP", + "EPROTONOSUPPORT", + "ENOBUFS", + "ENETDOWN", + "EADDRINUSE", + "EADDRNOTAVAIL", + "ECONNREFUSED", + "EINPROGRESS", + "ENOTSOCK", + "EFSM", + "ENOCOMPATPROTO", + "ETERM", + "EMTHREAD", + "IO_THREADS", + "MAX_SOCKETS", + "MORE", + "IDENTITY", + "SUBSCRIBE", + "UNSUBSCRIBE", + "LAST_ENDPOINT", + "TCP_ACCEPT_FILTER", + "PLAIN_USERNAME", + "PLAIN_PASSWORD", + "CURVE_PUBLICKEY", + "CURVE_SECRETKEY", + "CURVE_SERVERKEY", + "ZAP_DOMAIN", + "CONNECT_RID", + "RECONNECT_IVL_MAX", + "SNDTIMEO", + "RCVTIMEO", + "SNDHWM", + "RCVHWM", + "MULTICAST_HOPS", + "IPV4ONLY", + "ROUTER_BEHAVIOR", + "TCP_KEEPALIVE", + "TCP_KEEPALIVE_CNT", + "TCP_KEEPALIVE_IDLE", + "TCP_KEEPALIVE_INTVL", + "DELAY_ATTACH_ON_CONNECT", + "XPUB_VERBOSE", + "FD", + "EVENTS", + "TYPE", + "LINGER", + "RECONNECT_IVL", + "BACKLOG", + "ROUTER_MANDATORY", + "FAIL_UNROUTABLE", + "ROUTER_RAW", + "IMMEDIATE", + "IPV6", + "MECHANISM", + "PLAIN_SERVER", + "CURVE_SERVER", + "PROBE_ROUTER", + "REQ_RELAXED", + "REQ_CORRELATE", + "CONFLATE", + "ROUTER_HANDOVER", + "TOS", + "IPC_FILTER_PID", + "IPC_FILTER_UID", + "IPC_FILTER_GID", + "AFFINITY", + "MAXMSGSIZE", + "HWM", + "SWAP", + "MCAST_LOOP", + "RECOVERY_IVL_MSEC", + "RATE", + "RECOVERY_IVL", + "SNDBUF", + "RCVBUF", + "RCVMORE", +] diff --git a/zmq/backend/cython/constants.pyx b/zmq/backend/cython/constants.pyx new file mode 100644 index 0000000..f924f03 --- /dev/null +++ b/zmq/backend/cython/constants.pyx @@ -0,0 +1,32 @@ +"""0MQ Constants.""" + +# +# Copyright (c) 2010 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from libzmq cimport * + +#----------------------------------------------------------------------------- +# Python module level constants +#----------------------------------------------------------------------------- + +include "constants.pxi" diff --git a/zmq/backend/cython/context.pxd b/zmq/backend/cython/context.pxd new file mode 100644 index 0000000..9c9267a --- /dev/null +++ b/zmq/backend/cython/context.pxd @@ -0,0 +1,41 @@ +"""0MQ Context class declaration.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +cdef class Context: + + cdef object __weakref__ # enable weakref + cdef void *handle # The C handle for the underlying zmq object. + cdef bint _shadow # whether the Context is a shadow wrapper of another + cdef void **_sockets # A C-array containg socket handles + cdef size_t _n_sockets # the number of sockets + cdef size_t _max_sockets # the size of the _sockets array + cdef int _pid # the pid of the process which created me (for fork safety) + + cdef public bint closed # bool property for a closed context. + cdef inline int _term(self) + # helpers for events on _sockets in Socket.__cinit__()/close() + cdef inline void _add_socket(self, void* handle) + cdef inline void _remove_socket(self, void* handle) + diff --git a/zmq/backend/cython/context.pyx b/zmq/backend/cython/context.pyx new file mode 100644 index 0000000..b527e5d --- /dev/null +++ b/zmq/backend/cython/context.pyx @@ -0,0 +1,243 @@ +"""0MQ Context class.""" +# coding: utf-8 + +# Copyright (c) PyZMQ Developers. +# Distributed under the terms of the Lesser GNU Public License (LGPL). + +from libc.stdlib cimport free, malloc, realloc + +from libzmq cimport * + +cdef extern from "getpid_compat.h": + int getpid() + +from zmq.error import ZMQError +from zmq.backend.cython.checkrc cimport _check_rc + + +_instance = None + +cdef class Context: + """Context(io_threads=1) + + Manage the lifecycle of a 0MQ context. + + Parameters + ---------- + io_threads : int + The number of IO threads. + """ + + # no-op for the signature + def __init__(self, io_threads=1, shadow=0): + pass + + def __cinit__(self, int io_threads=1, size_t shadow=0, **kwargs): + self.handle = NULL + self._sockets = NULL + if shadow: + self.handle = shadow + self._shadow = True + else: + self._shadow = False + if ZMQ_VERSION_MAJOR >= 3: + self.handle = zmq_ctx_new() + else: + self.handle = zmq_init(io_threads) + + if self.handle == NULL: + raise ZMQError() + + cdef int rc = 0 + if ZMQ_VERSION_MAJOR >= 3 and not self._shadow: + rc = zmq_ctx_set(self.handle, ZMQ_IO_THREADS, io_threads) + _check_rc(rc) + + self.closed = False + self._n_sockets = 0 + self._max_sockets = 32 + + self._sockets = malloc(self._max_sockets*sizeof(void *)) + if self._sockets == NULL: + raise MemoryError("Could not allocate _sockets array") + + self._pid = getpid() + + def __dealloc__(self): + """don't touch members in dealloc, just cleanup allocations""" + cdef int rc + if self._sockets != NULL: + free(self._sockets) + self._sockets = NULL + self._n_sockets = 0 + + # we can't call object methods in dealloc as it + # might already be partially deleted + if not self._shadow: + self._term() + + cdef inline void _add_socket(self, void* handle): + """Add a socket handle to be closed when Context terminates. + + This is to be called in the Socket constructor. + """ + if self._n_sockets >= self._max_sockets: + self._max_sockets *= 2 + self._sockets = realloc(self._sockets, self._max_sockets*sizeof(void *)) + if self._sockets == NULL: + raise MemoryError("Could not reallocate _sockets array") + + self._sockets[self._n_sockets] = handle + self._n_sockets += 1 + + cdef inline void _remove_socket(self, void* handle): + """Remove a socket from the collected handles. + + This should be called by Socket.close, to prevent trying to + close a socket a second time. + """ + cdef bint found = False + + for idx in range(self._n_sockets): + if self._sockets[idx] == handle: + found=True + break + + if found: + self._n_sockets -= 1 + if self._n_sockets: + # move last handle to closed socket's index + self._sockets[idx] = self._sockets[self._n_sockets] + + + @property + def underlying(self): + """The address of the underlying libzmq context""" + return self.handle + + # backward-compat, though nobody is using it + _handle = underlying + + cdef inline int _term(self): + cdef int rc=0 + if self.handle != NULL and not self.closed and getpid() == self._pid: + with nogil: + rc = zmq_ctx_destroy(self.handle) + self.handle = NULL + return rc + + def term(self): + """ctx.term() + + Close or terminate the context. + + This can be called to close the context by hand. If this is not called, + the context will automatically be closed when it is garbage collected. + """ + cdef int rc + rc = self._term() + self.closed = True + + def set(self, int option, optval): + """ctx.set(option, optval) + + Set a context option. + + See the 0MQ API documentation for zmq_ctx_set + for details on specific options. + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + Parameters + ---------- + option : int + The option to set. Available values will depend on your + version of libzmq. Examples include:: + + zmq.IO_THREADS, zmq.MAX_SOCKETS + + optval : int + The value of the option to set. + """ + cdef int optval_int_c + cdef int rc + cdef char* optval_c + + if self.closed: + raise RuntimeError("Context has been destroyed") + + if not isinstance(optval, int): + raise TypeError('expected int, got: %r' % optval) + optval_int_c = optval + rc = zmq_ctx_set(self.handle, option, optval_int_c) + _check_rc(rc) + + def get(self, int option): + """ctx.get(option) + + Get the value of a context option. + + See the 0MQ API documentation for zmq_ctx_get + for details on specific options. + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + Parameters + ---------- + option : int + The option to get. Available values will depend on your + version of libzmq. Examples include:: + + zmq.IO_THREADS, zmq.MAX_SOCKETS + + Returns + ------- + optval : int + The value of the option as an integer. + """ + cdef int optval_int_c + cdef size_t sz + cdef int rc + + if self.closed: + raise RuntimeError("Context has been destroyed") + + rc = zmq_ctx_get(self.handle, option) + _check_rc(rc) + + return rc + + def destroy(self, linger=None): + """ctx.destroy(linger=None) + + Close all sockets associated with this context, and then terminate + the context. If linger is specified, + the LINGER sockopt of the sockets will be set prior to closing. + + .. warning:: + + destroy involves calling ``zmq_close()``, which is **NOT** threadsafe. + If there are active sockets in other threads, this must not be called. + """ + + cdef int linger_c + cdef bint setlinger=False + + if linger is not None: + linger_c = linger + setlinger=True + + if self.handle != NULL and not self.closed and self._n_sockets: + while self._n_sockets: + if setlinger: + zmq_setsockopt(self._sockets[0], ZMQ_LINGER, &linger_c, sizeof(int)) + rc = zmq_close(self._sockets[0]) + if rc < 0 and zmq_errno() != ZMQ_ENOTSOCK: + raise ZMQError() + self._n_sockets -= 1 + self._sockets[0] = self._sockets[self._n_sockets] + self.term() + +__all__ = ['Context'] diff --git a/zmq/backend/cython/error.pyx b/zmq/backend/cython/error.pyx new file mode 100644 index 0000000..85e785f --- /dev/null +++ b/zmq/backend/cython/error.pyx @@ -0,0 +1,56 @@ +"""0MQ Error classes and functions.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +# allow const char* +cdef extern from *: + ctypedef char* const_char_ptr "const char*" + +from libzmq cimport zmq_strerror, zmq_errno as zmq_errno_c + +from zmq.utils.strtypes import bytes + +def strerror(int errno): + """strerror(errno) + + Return the error string given the error number. + """ + cdef const_char_ptr str_e + # char * will be a bytes object: + str_e = zmq_strerror(errno) + if str is bytes: + # Python 2: str is bytes, so we already have the right type + return str_e + else: + # Python 3: decode bytes to unicode str + return str_e.decode() + +def zmq_errno(): + """zmq_errno() + + Return the integer errno of the most recent zmq error. + """ + return zmq_errno_c() + +__all__ = ['strerror', 'zmq_errno'] diff --git a/zmq/backend/cython/libzmq.pxd b/zmq/backend/cython/libzmq.pxd new file mode 100644 index 0000000..cc1ec53 --- /dev/null +++ b/zmq/backend/cython/libzmq.pxd @@ -0,0 +1,107 @@ +"""All the C imports for 0MQ""" + +# +# Copyright (c) 2010 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Import the C header files +#----------------------------------------------------------------------------- + +cdef extern from *: + ctypedef void* const_void_ptr "const void *" + +cdef extern from "zmq_compat.h": + ctypedef signed long long int64_t "pyzmq_int64_t" + +include "constant_enums.pxi" + +cdef extern from "zmq.h" nogil: + + void _zmq_version "zmq_version"(int *major, int *minor, int *patch) + + ctypedef int fd_t "ZMQ_FD_T" + + enum: errno + char *zmq_strerror (int errnum) + int zmq_errno() + + void *zmq_ctx_new () + int zmq_ctx_destroy (void *context) + int zmq_ctx_set (void *context, int option, int optval) + int zmq_ctx_get (void *context, int option) + void *zmq_init (int io_threads) + int zmq_term (void *context) + + # blackbox def for zmq_msg_t + ctypedef void * zmq_msg_t "zmq_msg_t" + + ctypedef void zmq_free_fn(void *data, void *hint) + + int zmq_msg_init (zmq_msg_t *msg) + int zmq_msg_init_size (zmq_msg_t *msg, size_t size) + int zmq_msg_init_data (zmq_msg_t *msg, void *data, + size_t size, zmq_free_fn *ffn, void *hint) + int zmq_msg_send (zmq_msg_t *msg, void *s, int flags) + int zmq_msg_recv (zmq_msg_t *msg, void *s, int flags) + int zmq_msg_close (zmq_msg_t *msg) + int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src) + int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src) + void *zmq_msg_data (zmq_msg_t *msg) + size_t zmq_msg_size (zmq_msg_t *msg) + int zmq_msg_more (zmq_msg_t *msg) + int zmq_msg_get (zmq_msg_t *msg, int option) + int zmq_msg_set (zmq_msg_t *msg, int option, int optval) + + void *zmq_socket (void *context, int type) + int zmq_close (void *s) + int zmq_setsockopt (void *s, int option, void *optval, size_t optvallen) + int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen) + int zmq_bind (void *s, char *addr) + int zmq_connect (void *s, char *addr) + int zmq_unbind (void *s, char *addr) + int zmq_disconnect (void *s, char *addr) + + int zmq_socket_monitor (void *s, char *addr, int flags) + + # send/recv + int zmq_sendbuf (void *s, const_void_ptr buf, size_t n, int flags) + int zmq_recvbuf (void *s, void *buf, size_t n, int flags) + + ctypedef struct zmq_pollitem_t: + void *socket + int fd + short events + short revents + + int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout) + + int zmq_device (int device_, void *insocket_, void *outsocket_) + int zmq_proxy (void *frontend, void *backend, void *capture) + +cdef extern from "zmq_utils.h" nogil: + + void *zmq_stopwatch_start () + unsigned long zmq_stopwatch_stop (void *watch_) + void zmq_sleep (int seconds_) + int zmq_curve_keypair (char *z85_public_key, char *z85_secret_key) + diff --git a/zmq/backend/cython/message.pxd b/zmq/backend/cython/message.pxd new file mode 100644 index 0000000..4781195 --- /dev/null +++ b/zmq/backend/cython/message.pxd @@ -0,0 +1,63 @@ +"""0MQ Message related class declarations.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from cpython cimport PyBytes_FromStringAndSize + +from libzmq cimport zmq_msg_t, zmq_msg_data, zmq_msg_size + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +cdef class MessageTracker(object): + + cdef set events # Message Event objects to track. + cdef set peers # Other Message or MessageTracker objects. + + +cdef class Frame: + + cdef zmq_msg_t zmq_msg + cdef object _data # The actual message data as a Python object. + cdef object _buffer # A Python Buffer/View of the message contents + cdef object _bytes # A bytes/str copy of the message. + cdef bint _failed_init # Flag to handle failed zmq_msg_init + cdef public object tracker_event # Event for use with zmq_free_fn. + cdef public object tracker # MessageTracker object. + cdef public bint more # whether RCVMORE was set + + cdef Frame fast_copy(self) # Create shallow copy of Message object. + cdef object _getbuffer(self) # Construct self._buffer. + + +cdef inline object copy_zmq_msg_bytes(zmq_msg_t *zmq_msg): + """ Copy the data from a zmq_msg_t """ + cdef char *data_c = NULL + cdef Py_ssize_t data_len_c + data_c = zmq_msg_data(zmq_msg) + data_len_c = zmq_msg_size(zmq_msg) + return PyBytes_FromStringAndSize(data_c, data_len_c) + + diff --git a/zmq/backend/cython/message.pyx b/zmq/backend/cython/message.pyx new file mode 100644 index 0000000..bef6e78 --- /dev/null +++ b/zmq/backend/cython/message.pyx @@ -0,0 +1,356 @@ +"""0MQ Message related classes.""" + +# +# Copyright (c) 2013 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +# get version-independent aliases: +cdef extern from "pyversion_compat.h": + pass + +from cpython cimport Py_DECREF, Py_INCREF + +from buffers cimport asbuffer_r, viewfromobject_r + +cdef extern from "Python.h": + ctypedef int Py_ssize_t + +from libzmq cimport * + +from libc.stdio cimport fprintf, stderr as cstderr +from libc.stdlib cimport malloc, free +from libc.string cimport memcpy + +import time + +try: + # below 3.3 + from threading import _Event as Event +except (ImportError, AttributeError): + # python throws ImportError, cython throws AttributeError + from threading import Event + +import zmq +from zmq.backend.cython.checkrc cimport _check_rc +from zmq.utils.strtypes import bytes,unicode,basestring + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +ctypedef struct zhint: + void *ctx + size_t id + +cdef void free_python_msg(void *data, void *vhint) nogil: + """A pure-C function for DECREF'ing Python-owned message data. + + Sends a message on a PUSH socket + + The hint is a `zhint` struct with two values: + + ctx (void *): pointer to the Garbage Collector's context + id (size_t): the id to be used to construct a zmq_msg_t that should be sent on a PUSH socket, + signaling the Garbage Collector to remove its reference to the object. + + - A PUSH socket is created in the context, + - it is connected to the garbage collector inproc channel, + - it sends the gc message + - the PUSH socket is closed + + When the Garbage Collector's PULL socket receives the message, + it deletes its reference to the object, + allowing Python to free the memory. + """ + cdef void *push + cdef zmq_msg_t msg + cdef zhint *hint = vhint + if hint != NULL: + zmq_msg_init_size(&msg, sizeof(size_t)) + memcpy(zmq_msg_data(&msg), &hint.id, sizeof(size_t)) + + push = zmq_socket(hint.ctx, ZMQ_PUSH) + if push == NULL: + # this will happen if the context has been terminated + return + rc = zmq_connect(push, "inproc://pyzmq.gc.01") + if rc < 0: + fprintf(cstderr, "pyzmq-gc connect failed: %s\n", zmq_strerror(zmq_errno())) + return + + rc = zmq_msg_send(&msg, push, 0) + if rc < 0: + fprintf(cstderr, "pyzmq-gc send failed: %s\n", zmq_strerror(zmq_errno())) + + zmq_msg_close(&msg) + zmq_close(push) + free(hint) + +gc = None + +cdef class Frame: + """Frame(data=None, track=False) + + A zmq message Frame class for non-copy send/recvs. + + This class is only needed if you want to do non-copying send and recvs. + When you pass a string to this class, like ``Frame(s)``, the + ref-count of `s` is increased by two: once because the Frame saves `s` as + an instance attribute and another because a ZMQ message is created that + points to the buffer of `s`. This second ref-count increase makes sure + that `s` lives until all messages that use it have been sent. Once 0MQ + sends all the messages and it doesn't need the buffer of s, 0MQ will call + ``Py_DECREF(s)``. + + Parameters + ---------- + + data : object, optional + any object that provides the buffer interface will be used to + construct the 0MQ message data. + track : bool [default: False] + whether a MessageTracker_ should be created to track this object. + Tracking a message has a cost at creation, because it creates a threadsafe + Event object. + + """ + + def __cinit__(self, object data=None, track=False, **kwargs): + cdef int rc + cdef char *data_c = NULL + cdef Py_ssize_t data_len_c=0 + cdef zhint *hint + + # init more as False + self.more = False + + # Save the data object in case the user wants the the data as a str. + self._data = data + self._failed_init = True # bool switch for dealloc + self._buffer = None # buffer view of data + self._bytes = None # bytes copy of data + + # Event and MessageTracker for monitoring when zmq is done with data: + if track: + evt = Event() + self.tracker_event = evt + self.tracker = zmq.MessageTracker(evt) + else: + self.tracker_event = None + self.tracker = None + + if isinstance(data, unicode): + raise TypeError("Unicode objects not allowed. Only: str/bytes, buffer interfaces.") + + if data is None: + rc = zmq_msg_init(&self.zmq_msg) + _check_rc(rc) + self._failed_init = False + return + else: + asbuffer_r(data, &data_c, &data_len_c) + + # create the hint for zmq_free_fn + # two pointers: the gc context and a message to be sent to the gc PULL socket + # allows libzmq to signal to Python when it is done with Python-owned memory. + global gc + if gc is None: + from zmq.utils.garbage import gc + + hint = malloc(sizeof(zhint)) + hint.id = gc.store(data, self.tracker_event) + hint.ctx = gc._context.underlying + + rc = zmq_msg_init_data( + &self.zmq_msg, data_c, data_len_c, + free_python_msg, hint + ) + if rc != 0: + free(hint) + _check_rc(rc) + self._failed_init = False + + def __init__(self, object data=None, track=False): + """Enforce signature""" + pass + + def __dealloc__(self): + cdef int rc + if self._failed_init: + return + # This simply decreases the 0MQ ref-count of zmq_msg. + with nogil: + rc = zmq_msg_close(&self.zmq_msg) + _check_rc(rc) + + # buffer interface code adapted from petsc4py by Lisandro Dalcin, a BSD project + + def __getbuffer__(self, Py_buffer* buffer, int flags): + # new-style (memoryview) buffer interface + buffer.buf = zmq_msg_data(&self.zmq_msg) + buffer.len = zmq_msg_size(&self.zmq_msg) + + buffer.obj = self + buffer.readonly = 1 + buffer.format = "B" + buffer.ndim = 0 + buffer.shape = NULL + buffer.strides = NULL + buffer.suboffsets = NULL + buffer.itemsize = 1 + buffer.internal = NULL + + def __getsegcount__(self, Py_ssize_t *lenp): + # required for getreadbuffer + if lenp != NULL: + lenp[0] = zmq_msg_size(&self.zmq_msg) + return 1 + + def __getreadbuffer__(self, Py_ssize_t idx, void **p): + # old-style (buffer) interface + cdef char *data_c = NULL + cdef Py_ssize_t data_len_c + if idx != 0: + raise SystemError("accessing non-existent buffer segment") + # read-only, because we don't want to allow + # editing of the message in-place + data_c = zmq_msg_data(&self.zmq_msg) + data_len_c = zmq_msg_size(&self.zmq_msg) + if p != NULL: + p[0] = data_c + return data_len_c + + # end buffer interface + + def __copy__(self): + """Create a shallow copy of the message. + + This does not copy the contents of the Frame, just the pointer. + This will increment the 0MQ ref count of the message, but not + the ref count of the Python object. That is only done once when + the Python is first turned into a 0MQ message. + """ + return self.fast_copy() + + cdef Frame fast_copy(self): + """Fast, cdef'd version of shallow copy of the Frame.""" + cdef Frame new_msg + new_msg = Frame() + # This does not copy the contents, but just increases the ref-count + # of the zmq_msg by one. + zmq_msg_copy(&new_msg.zmq_msg, &self.zmq_msg) + # Copy the ref to data so the copy won't create a copy when str is + # called. + if self._data is not None: + new_msg._data = self._data + if self._buffer is not None: + new_msg._buffer = self._buffer + if self._bytes is not None: + new_msg._bytes = self._bytes + + # Frame copies share the tracker and tracker_event + new_msg.tracker_event = self.tracker_event + new_msg.tracker = self.tracker + + return new_msg + + def __len__(self): + """Return the length of the message in bytes.""" + cdef size_t sz + sz = zmq_msg_size(&self.zmq_msg) + return sz + # return zmq_msg_size(&self.zmq_msg) + + def __str__(self): + """Return the str form of the message.""" + if isinstance(self._data, bytes): + b = self._data + else: + b = self.bytes + if str is unicode: + return b.decode() + else: + return b + + cdef inline object _getbuffer(self): + """Create a Python buffer/view of the message data. + + This will be called only once, the first time the `buffer` property + is accessed. Subsequent calls use a cached copy. + """ + if self._data is None: + return viewfromobject_r(self) + else: + return viewfromobject_r(self._data) + + @property + def buffer(self): + """A read-only buffer view of the message contents.""" + if self._buffer is None: + self._buffer = self._getbuffer() + return self._buffer + + @property + def bytes(self): + """The message content as a Python bytes object. + + The first time this property is accessed, a copy of the message + contents is made. From then on that same copy of the message is + returned. + """ + if self._bytes is None: + self._bytes = copy_zmq_msg_bytes(&self.zmq_msg) + return self._bytes + + def set(self, int option, int value): + """Frame.set(option, value) + + Set a Frame option. + + See the 0MQ API documentation for zmq_msg_set + for details on specific options. + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + """ + cdef int rc = zmq_msg_set(&self.zmq_msg, option, value) + _check_rc(rc) + + def get(self, int option): + """Frame.get(option) + + Get a Frame option. + + See the 0MQ API documentation for zmq_msg_get + for details on specific options. + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + """ + cdef int rc = zmq_msg_get(&self.zmq_msg, option) + _check_rc(rc) + return rc + +# legacy Message name +Message = Frame + +__all__ = ['Frame', 'Message'] diff --git a/zmq/backend/cython/rebuffer.pyx b/zmq/backend/cython/rebuffer.pyx new file mode 100644 index 0000000..402e3b6 --- /dev/null +++ b/zmq/backend/cython/rebuffer.pyx @@ -0,0 +1,104 @@ +""" +Utility for changing itemsize of memoryviews, and getting +numpy arrays from byte-arrays that should be interpreted with a different +itemsize. + +Authors +------- +* MinRK +""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley +# +# This file is part of pyzmq +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +from libc.stdlib cimport malloc +from buffers cimport * + +cdef inline object _rebuffer(object obj, char * format, int itemsize): + """clobber the format & itemsize of a 1-D + + This is the Python 3 model, but will work on Python >= 2.6. Currently, + we use it only on >= 3.0. + """ + cdef Py_buffer view + cdef int flags = PyBUF_SIMPLE + cdef int mode = 0 + # cdef Py_ssize_t *shape, *strides, *suboffsets + + mode = check_buffer(obj) + if mode == 0: + raise TypeError("%r does not provide a buffer interface."%obj) + + if mode == 3: + flags = PyBUF_ANY_CONTIGUOUS + if format: + flags |= PyBUF_FORMAT + PyObject_GetBuffer(obj, &view, flags) + assert view.ndim <= 1, "Can only reinterpret 1-D memoryviews" + assert view.len % itemsize == 0, "Buffer of length %i not divisible into items of size %i"%(view.len, itemsize) + # hack the format + view.ndim = 1 + view.format = format + view.itemsize = itemsize + view.strides = malloc(sizeof(Py_ssize_t)) + view.strides[0] = itemsize + view.shape = malloc(sizeof(Py_ssize_t)) + view.shape[0] = view.len/itemsize + view.suboffsets = malloc(sizeof(Py_ssize_t)) + view.suboffsets[0] = 0 + # for debug: make buffer writable, for zero-copy testing + # view.readonly = 0 + + return PyMemoryView_FromBuffer(&view) + else: + raise TypeError("This funciton is only for new-style buffer objects.") + +def rebuffer(obj, format, itemsize): + """Change the itemsize of a memoryview. + + Only for 1D contiguous buffers. + """ + return _rebuffer(obj, format, itemsize) + +def array_from_buffer(view, dtype, shape): + """Get a numpy array from a memoryview, regardless of the itemsize of the original + memoryview. This is important, because pyzmq does not send memoryview shape data + over the wire, so we need to change the memoryview itemsize before calling + asarray. + """ + import numpy + A = numpy.array([],dtype=dtype) + ref = viewfromobject(A,0) + fmt = ref.format.encode() + buf = viewfromobject(view, 0) + buf = _rebuffer(view, fmt, ref.itemsize) + return numpy.asarray(buf, dtype=dtype).reshape(shape) + +def print_view_info(obj): + """simple utility for printing info on a new-style buffer object""" + cdef Py_buffer view + cdef int flags = PyBUF_ANY_CONTIGUOUS|PyBUF_FORMAT + cdef int mode = 0 + + mode = check_buffer(obj) + if mode == 0: + raise TypeError("%r does not provide a buffer interface."%obj) + + if mode == 3: + PyObject_GetBuffer(obj, &view, flags) + print view.buf, view.len, view.format, view.ndim, + if view.ndim: + if view.shape: + print view.shape[0], + if view.strides: + print view.strides[0], + if view.suboffsets: + print view.suboffsets[0], + print + \ No newline at end of file diff --git a/zmq/backend/cython/socket.pxd b/zmq/backend/cython/socket.pxd new file mode 100644 index 0000000..b8a331e --- /dev/null +++ b/zmq/backend/cython/socket.pxd @@ -0,0 +1,47 @@ +"""0MQ Socket class declaration.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from context cimport Context + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + + +cdef class Socket: + + cdef object __weakref__ # enable weakref + cdef void *handle # The C handle for the underlying zmq object. + cdef bint _shadow # whether the Socket is a shadow wrapper of another + # Hold on to a reference to the context to make sure it is not garbage + # collected until the socket it done with it. + cdef public Context context # The zmq Context object that owns this. + cdef public bint _closed # bool property for a closed socket. + cdef int _pid # the pid of the process which created me (for fork safety) + + # cpdef methods for direct-cython access: + cpdef object send(self, object data, int flags=*, copy=*, track=*) + cpdef object recv(self, int flags=*, copy=*, track=*) + diff --git a/zmq/backend/cython/socket.pyx b/zmq/backend/cython/socket.pyx new file mode 100644 index 0000000..9267364 --- /dev/null +++ b/zmq/backend/cython/socket.pyx @@ -0,0 +1,669 @@ +"""0MQ Socket class.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Cython Imports +#----------------------------------------------------------------------------- + +# get version-independent aliases: +cdef extern from "pyversion_compat.h": + pass + +from libc.errno cimport ENAMETOOLONG +from libc.string cimport memcpy + +from cpython cimport PyBytes_FromStringAndSize +from cpython cimport PyBytes_AsString, PyBytes_Size +from cpython cimport Py_DECREF, Py_INCREF + +from buffers cimport asbuffer_r, viewfromobject_r + +from libzmq cimport * +from message cimport Frame, copy_zmq_msg_bytes + +from context cimport Context + +cdef extern from "Python.h": + ctypedef int Py_ssize_t + +cdef extern from "ipcmaxlen.h": + int get_ipc_path_max_len() + +cdef extern from "getpid_compat.h": + int getpid() + + +#----------------------------------------------------------------------------- +# Python Imports +#----------------------------------------------------------------------------- + +import copy as copy_mod +import time +import sys +import random +import struct +import codecs + +from zmq.utils import jsonapi + +try: + import cPickle + pickle = cPickle +except: + cPickle = None + import pickle + +import zmq +from zmq.backend.cython import constants +from zmq.backend.cython.constants import * +from zmq.backend.cython.checkrc cimport _check_rc +from zmq.error import ZMQError, ZMQBindError, _check_version +from zmq.utils.strtypes import bytes,unicode,basestring + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +IPC_PATH_MAX_LEN = get_ipc_path_max_len() + +# inline some small socket submethods: +# true methods frequently cannot be inlined, acc. Cython docs + +cdef inline _check_closed(Socket s): + """raise ENOTSUP if socket is closed + + Does not do a deep check + """ + if s._closed: + raise ZMQError(ENOTSOCK) + +cdef inline _check_closed_deep(Socket s): + """thorough check of whether the socket has been closed, + even if by another entity (e.g. ctx.destroy). + + Only used by the `closed` property. + + returns True if closed, False otherwise + """ + cdef int rc + cdef int errno + cdef int stype + cdef size_t sz=sizeof(int) + if s._closed: + return True + else: + rc = zmq_getsockopt(s.handle, ZMQ_TYPE, &stype, &sz) + if rc < 0 and zmq_errno() == ENOTSOCK: + s._closed = True + return True + else: + _check_rc(rc) + return False + +cdef inline Frame _recv_frame(void *handle, int flags=0, track=False): + """Receive a message in a non-copying manner and return a Frame.""" + cdef int rc + cdef Frame msg + msg = Frame(track=track) + + with nogil: + rc = zmq_msg_recv(&msg.zmq_msg, handle, flags) + + _check_rc(rc) + return msg + +cdef inline object _recv_copy(void *handle, int flags=0): + """Receive a message and return a copy""" + cdef zmq_msg_t zmq_msg + with nogil: + zmq_msg_init (&zmq_msg) + rc = zmq_msg_recv(&zmq_msg, handle, flags) + _check_rc(rc) + msg_bytes = copy_zmq_msg_bytes(&zmq_msg) + zmq_msg_close(&zmq_msg) + return msg_bytes + +cdef inline object _send_frame(void *handle, Frame msg, int flags=0): + """Send a Frame on this socket in a non-copy manner.""" + cdef int rc + cdef Frame msg_copy + + # Always copy so the original message isn't garbage collected. + # This doesn't do a real copy, just a reference. + msg_copy = msg.fast_copy() + + with nogil: + rc = zmq_msg_send(&msg_copy.zmq_msg, handle, flags) + + _check_rc(rc) + return msg.tracker + + +cdef inline object _send_copy(void *handle, object msg, int flags=0): + """Send a message on this socket by copying its content.""" + cdef int rc, rc2 + cdef zmq_msg_t data + cdef char *msg_c + cdef Py_ssize_t msg_c_len=0 + + # copy to c array: + asbuffer_r(msg, &msg_c, &msg_c_len) + + # Copy the msg before sending. This avoids any complications with + # the GIL, etc. + # If zmq_msg_init_* fails we must not call zmq_msg_close (Bus Error) + rc = zmq_msg_init_size(&data, msg_c_len) + + _check_rc(rc) + + with nogil: + memcpy(zmq_msg_data(&data), msg_c, zmq_msg_size(&data)) + rc = zmq_msg_send(&data, handle, flags) + rc2 = zmq_msg_close(&data) + _check_rc(rc) + _check_rc(rc2) + + +cdef class Socket: + """Socket(context, socket_type) + + A 0MQ socket. + + These objects will generally be constructed via the socket() method of a Context object. + + Note: 0MQ Sockets are *not* threadsafe. **DO NOT** share them across threads. + + Parameters + ---------- + context : Context + The 0MQ Context this Socket belongs to. + socket_type : int + The socket type, which can be any of the 0MQ socket types: + REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, XPUB, XSUB. + + See Also + -------- + .Context.socket : method for creating a socket bound to a Context. + """ + + # no-op for the signature + def __init__(self, context=None, socket_type=-1, shadow=0): + pass + + def __cinit__(self, Context context=None, int socket_type=-1, size_t shadow=0, *args, **kwargs): + cdef Py_ssize_t c_handle + + self.handle = NULL + self.context = context + if shadow: + self._shadow = True + self.handle = shadow + else: + if context is None: + raise TypeError("context must be specified") + if socket_type < 0: + raise TypeError("socket_type must be specified") + self._shadow = False + self.handle = zmq_socket(context.handle, socket_type) + if self.handle == NULL: + raise ZMQError() + self._closed = False + self._pid = getpid() + if context: + context._add_socket(self.handle) + + def __dealloc__(self): + """remove from context's list + + But be careful that context might not exist if called during gc + """ + if self.handle != NULL and not self._shadow and getpid() == self._pid: + # during gc, self.context might be NULL + if self.context: + self.context._remove_socket(self.handle) + + @property + def underlying(self): + """The address of the underlying libzmq socket""" + return self.handle + + @property + def closed(self): + return _check_closed_deep(self) + + def close(self, linger=None): + """s.close(linger=None) + + Close the socket. + + If linger is specified, LINGER sockopt will be set prior to closing. + + This can be called to close the socket by hand. If this is not + called, the socket will automatically be closed when it is + garbage collected. + """ + cdef int rc=0 + cdef int linger_c + cdef bint setlinger=False + + if linger is not None: + linger_c = linger + setlinger=True + + if self.handle != NULL and not self._closed and getpid() == self._pid: + if setlinger: + zmq_setsockopt(self.handle, ZMQ_LINGER, &linger_c, sizeof(int)) + rc = zmq_close(self.handle) + if rc != 0 and zmq_errno() != ENOTSOCK: + # ignore ENOTSOCK (closed by Context) + _check_rc(rc) + self._closed = True + # during gc, self.context might be NULL + if self.context: + self.context._remove_socket(self.handle) + self.handle = NULL + + def set(self, int option, optval): + """s.set(option, optval) + + Set socket options. + + See the 0MQ API documentation for details on specific options. + + Parameters + ---------- + option : int + The option to set. Available values will depend on your + version of libzmq. Examples include:: + + zmq.SUBSCRIBE, UNSUBSCRIBE, IDENTITY, HWM, LINGER, FD + + optval : int or bytes + The value of the option to set. + """ + cdef int64_t optval_int64_c + cdef int optval_int_c + cdef int rc + cdef char* optval_c + cdef Py_ssize_t sz + + _check_closed(self) + if isinstance(optval, unicode): + raise TypeError("unicode not allowed, use setsockopt_string") + + if option in zmq.constants.bytes_sockopts: + if not isinstance(optval, bytes): + raise TypeError('expected bytes, got: %r' % optval) + optval_c = PyBytes_AsString(optval) + sz = PyBytes_Size(optval) + rc = zmq_setsockopt( + self.handle, option, + optval_c, sz + ) + elif option in zmq.constants.int64_sockopts: + if not isinstance(optval, int): + raise TypeError('expected int, got: %r' % optval) + optval_int64_c = optval + rc = zmq_setsockopt( + self.handle, option, + &optval_int64_c, sizeof(int64_t) + ) + else: + # default is to assume int, which is what most new sockopts will be + # this lets pyzmq work with newer libzmq which may add constants + # pyzmq has not yet added, rather than artificially raising. Invalid + # sockopts will still raise just the same, but it will be libzmq doing + # the raising. + if not isinstance(optval, int): + raise TypeError('expected int, got: %r' % optval) + optval_int_c = optval + rc = zmq_setsockopt( + self.handle, option, + &optval_int_c, sizeof(int) + ) + + _check_rc(rc) + + def get(self, int option): + """s.get(option) + + Get the value of a socket option. + + See the 0MQ API documentation for details on specific options. + + Parameters + ---------- + option : int + The option to get. Available values will depend on your + version of libzmq. Examples include:: + + zmq.IDENTITY, HWM, LINGER, FD, EVENTS + + Returns + ------- + optval : int or bytes + The value of the option as a bytestring or int. + """ + cdef int64_t optval_int64_c + cdef int optval_int_c + cdef fd_t optval_fd_c + cdef char identity_str_c [255] + cdef size_t sz + cdef int rc + + _check_closed(self) + + if option in zmq.constants.bytes_sockopts: + sz = 255 + rc = zmq_getsockopt(self.handle, option, identity_str_c, &sz) + _check_rc(rc) + # strip null-terminated strings *except* identity + if option != ZMQ_IDENTITY and sz > 0 and (identity_str_c)[sz-1] == b'\0': + sz -= 1 + result = PyBytes_FromStringAndSize(identity_str_c, sz) + elif option in zmq.constants.int64_sockopts: + sz = sizeof(int64_t) + rc = zmq_getsockopt(self.handle, option, &optval_int64_c, &sz) + _check_rc(rc) + result = optval_int64_c + elif option == ZMQ_FD: + sz = sizeof(fd_t) + rc = zmq_getsockopt(self.handle, option, &optval_fd_c, &sz) + _check_rc(rc) + result = optval_fd_c + else: + # default is to assume int, which is what most new sockopts will be + # this lets pyzmq work with newer libzmq which may add constants + # pyzmq has not yet added, rather than artificially raising. Invalid + # sockopts will still raise just the same, but it will be libzmq doing + # the raising. + sz = sizeof(int) + rc = zmq_getsockopt(self.handle, option, &optval_int_c, &sz) + _check_rc(rc) + result = optval_int_c + + return result + + def bind(self, addr): + """s.bind(addr) + + Bind the socket to an address. + + This causes the socket to listen on a network port. Sockets on the + other side of this connection will use ``Socket.connect(addr)`` to + connect to this socket. + + Parameters + ---------- + addr : str + The address string. This has the form 'protocol://interface:port', + for example 'tcp://127.0.0.1:5555'. Protocols supported include + tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is + encoded to utf-8 first. + """ + cdef int rc + cdef char* c_addr + + _check_closed(self) + if isinstance(addr, unicode): + addr = addr.encode('utf-8') + if not isinstance(addr, bytes): + raise TypeError('expected str, got: %r' % addr) + c_addr = addr + rc = zmq_bind(self.handle, c_addr) + if rc != 0: + if IPC_PATH_MAX_LEN and zmq_errno() == ENAMETOOLONG: + # py3compat: addr is bytes, but msg wants str + if str is unicode: + addr = addr.decode('utf-8', 'replace') + path = addr.split('://', 1)[-1] + msg = ('ipc path "{0}" is longer than {1} ' + 'characters (sizeof(sockaddr_un.sun_path)). ' + 'zmq.IPC_PATH_MAX_LEN constant can be used ' + 'to check addr length (if it is defined).' + .format(path, IPC_PATH_MAX_LEN)) + raise ZMQError(msg=msg) + _check_rc(rc) + + def connect(self, addr): + """s.connect(addr) + + Connect to a remote 0MQ socket. + + Parameters + ---------- + addr : str + The address string. This has the form 'protocol://interface:port', + for example 'tcp://127.0.0.1:5555'. Protocols supported are + tcp, upd, pgm, inproc and ipc. If the address is unicode, it is + encoded to utf-8 first. + """ + cdef int rc + cdef char* c_addr + + _check_closed(self) + if isinstance(addr, unicode): + addr = addr.encode('utf-8') + if not isinstance(addr, bytes): + raise TypeError('expected str, got: %r' % addr) + c_addr = addr + + rc = zmq_connect(self.handle, c_addr) + if rc != 0: + raise ZMQError() + + def unbind(self, addr): + """s.unbind(addr) + + Unbind from an address (undoes a call to bind). + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + Parameters + ---------- + addr : str + The address string. This has the form 'protocol://interface:port', + for example 'tcp://127.0.0.1:5555'. Protocols supported are + tcp, upd, pgm, inproc and ipc. If the address is unicode, it is + encoded to utf-8 first. + """ + cdef int rc + cdef char* c_addr + + _check_version((3,2), "unbind") + _check_closed(self) + if isinstance(addr, unicode): + addr = addr.encode('utf-8') + if not isinstance(addr, bytes): + raise TypeError('expected str, got: %r' % addr) + c_addr = addr + + rc = zmq_unbind(self.handle, c_addr) + if rc != 0: + raise ZMQError() + + def disconnect(self, addr): + """s.disconnect(addr) + + Disconnect from a remote 0MQ socket (undoes a call to connect). + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + Parameters + ---------- + addr : str + The address string. This has the form 'protocol://interface:port', + for example 'tcp://127.0.0.1:5555'. Protocols supported are + tcp, upd, pgm, inproc and ipc. If the address is unicode, it is + encoded to utf-8 first. + """ + cdef int rc + cdef char* c_addr + + _check_version((3,2), "disconnect") + _check_closed(self) + if isinstance(addr, unicode): + addr = addr.encode('utf-8') + if not isinstance(addr, bytes): + raise TypeError('expected str, got: %r' % addr) + c_addr = addr + + rc = zmq_disconnect(self.handle, c_addr) + if rc != 0: + raise ZMQError() + + def monitor(self, addr, int events=ZMQ_EVENT_ALL): + """s.monitor(addr, flags) + + Start publishing socket events on inproc. + See libzmq docs for zmq_monitor for details. + + While this function is available from libzmq 3.2, + pyzmq cannot parse monitor messages from libzmq prior to 4.0. + + .. versionadded: libzmq-3.2 + .. versionadded: 14.0 + + Parameters + ---------- + addr : str + The inproc url used for monitoring. + events : int [default: zmq.EVENT_ALL] + The zmq event bitmask for which events will be sent to the monitor. + """ + cdef int rc, c_flags + cdef char* c_addr + + _check_version((3,2), "monitor") + if isinstance(addr, unicode): + addr = addr.encode('utf-8') + if not isinstance(addr, bytes): + raise TypeError('expected str, got: %r' % addr) + c_addr = addr + c_flags = events + rc = zmq_socket_monitor(self.handle, c_addr, c_flags) + _check_rc(rc) + + #------------------------------------------------------------------------- + # Sending and receiving messages + #------------------------------------------------------------------------- + + cpdef object send(self, object data, int flags=0, copy=True, track=False): + """s.send(data, flags=0, copy=True, track=False) + + Send a message on this socket. + + This queues the message to be sent by the IO thread at a later time. + + Parameters + ---------- + data : object, str, Frame + The content of the message. + flags : int + Any supported flag: NOBLOCK, SNDMORE. + copy : bool + Should the message be sent in a copying or non-copying manner. + track : bool + Should the message be tracked for notification that ZMQ has + finished with it? (ignored if copy=True) + + Returns + ------- + None : if `copy` or not track + None if message was sent, raises an exception otherwise. + MessageTracker : if track and not copy + a MessageTracker object, whose `pending` property will + be True until the send is completed. + + Raises + ------ + TypeError + If a unicode object is passed + ValueError + If `track=True`, but an untracked Frame is passed. + ZMQError + If the send does not succeed for any reason. + + """ + _check_closed(self) + + if isinstance(data, unicode): + raise TypeError("unicode not allowed, use send_string") + + if copy: + # msg.bytes never returns the input data object + # it is always a copy, but always the same copy + if isinstance(data, Frame): + data = data.buffer + return _send_copy(self.handle, data, flags) + else: + if isinstance(data, Frame): + if track and not data.tracker: + raise ValueError('Not a tracked message') + msg = data + else: + msg = Frame(data, track=track) + return _send_frame(self.handle, msg, flags) + + cpdef object recv(self, int flags=0, copy=True, track=False): + """s.recv(flags=0, copy=True, track=False) + + Receive a message. + + Parameters + ---------- + flags : int + Any supported flag: NOBLOCK. If NOBLOCK is set, this method + will raise a ZMQError with EAGAIN if a message is not ready. + If NOBLOCK is not set, then this method will block until a + message arrives. + copy : bool + Should the message be received in a copying or non-copying manner? + If False a Frame object is returned, if True a string copy of + message is returned. + track : bool + Should the message be tracked for notification that ZMQ has + finished with it? (ignored if copy=True) + + Returns + ------- + msg : bytes, Frame + The received message frame. If `copy` is False, then it will be a Frame, + otherwise it will be bytes. + + Raises + ------ + ZMQError + for any of the reasons zmq_msg_recv might fail. + """ + _check_closed(self) + + if copy: + return _recv_copy(self.handle, flags) + else: + frame = _recv_frame(self.handle, flags, track) + frame.more = self.getsockopt(zmq.RCVMORE) + return frame + + +__all__ = ['Socket', 'IPC_PATH_MAX_LEN'] diff --git a/zmq/backend/cython/utils.pxd b/zmq/backend/cython/utils.pxd new file mode 100644 index 0000000..1d7117f --- /dev/null +++ b/zmq/backend/cython/utils.pxd @@ -0,0 +1,29 @@ +"""Wrap zmq_utils.h""" + +# +# Copyright (c) 2010 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + + +cdef class Stopwatch: + cdef void *watch # The C handle for the underlying zmq object + diff --git a/zmq/backend/cython/utils.pyx b/zmq/backend/cython/utils.pyx new file mode 100644 index 0000000..3283cee --- /dev/null +++ b/zmq/backend/cython/utils.pyx @@ -0,0 +1,111 @@ +"""0MQ utils.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from libzmq cimport zmq_stopwatch_start, zmq_stopwatch_stop, zmq_sleep, zmq_curve_keypair + +from zmq.error import ZMQError, _check_rc, _check_version + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +def curve_keypair(): + """generate a Z85 keypair for use with zmq.CURVE security + + Requires libzmq (≥ 4.0) to have been linked with libsodium. + + .. versionadded:: libzmq-4.0 + .. versionadded:: 14.0 + + Returns + ------- + (public, secret) : two bytestrings + The public and private keypair as 40 byte z85-encoded bytestrings. + """ + cdef int rc + cdef char[64] public_key + cdef char[64] secret_key + _check_version((4,0), "curve_keypair") + rc = zmq_curve_keypair (public_key, secret_key) + _check_rc(rc) + return public_key, secret_key + + +cdef class Stopwatch: + """Stopwatch() + + A simple stopwatch based on zmq_stopwatch_start/stop. + + This class should be used for benchmarking and timing 0MQ code. + """ + + def __cinit__(self): + self.watch = NULL + + def __dealloc__(self): + # copy of self.stop() we can't call object methods in dealloc as it + # might already be partially deleted + if self.watch: + zmq_stopwatch_stop(self.watch) + self.watch = NULL + + def start(self): + """s.start() + + Start the stopwatch. + """ + if self.watch == NULL: + self.watch = zmq_stopwatch_start() + else: + raise ZMQError('Stopwatch is already running.') + + def stop(self): + """s.stop() + + Stop the stopwatch. + + Returns + ------- + t : unsigned long int + the number of microseconds since ``start()`` was called. + """ + cdef unsigned long time + if self.watch == NULL: + raise ZMQError('Must start the Stopwatch before calling stop.') + else: + time = zmq_stopwatch_stop(self.watch) + self.watch = NULL + return time + + def sleep(self, int seconds): + """s.sleep(seconds) + + Sleep for an integer number of seconds. + """ + with nogil: + zmq_sleep(seconds) + + +__all__ = ['curve_keypair', 'Stopwatch'] -- cgit v1.2.3