From 7d5c3dcd969161322deed6c43f8a6a3cb92c3369 Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Tue, 11 Nov 2014 11:53:55 -0500 Subject: upgrade to 14.4.1 --- zmq/backend/__init__.py | 45 +++ zmq/backend/cffi/__init__.py | 22 ++ zmq/backend/cffi/_cdefs.h | 68 ++++ zmq/backend/cffi/_cffi.py | 127 +++++++ zmq/backend/cffi/_poll.py | 56 +++ zmq/backend/cffi/_verify.c | 12 + zmq/backend/cffi/constants.py | 15 + zmq/backend/cffi/context.py | 100 +++++ zmq/backend/cffi/devices.py | 24 ++ zmq/backend/cffi/error.py | 13 + zmq/backend/cffi/message.py | 69 ++++ zmq/backend/cffi/socket.py | 244 ++++++++++++ zmq/backend/cffi/utils.py | 62 ++++ zmq/backend/cython/__init__.py | 23 ++ zmq/backend/cython/_device.pyx | 89 +++++ zmq/backend/cython/_poll.pyx | 137 +++++++ zmq/backend/cython/_version.pyx | 43 +++ zmq/backend/cython/checkrc.pxd | 23 ++ zmq/backend/cython/constant_enums.pxi | 156 ++++++++ zmq/backend/cython/constants.pxi | 318 ++++++++++++++++ zmq/backend/cython/constants.pyx | 32 ++ zmq/backend/cython/context.pxd | 41 +++ zmq/backend/cython/context.pyx | 243 ++++++++++++ zmq/backend/cython/error.pyx | 56 +++ zmq/backend/cython/libzmq.pxd | 110 ++++++ zmq/backend/cython/message.pxd | 63 ++++ zmq/backend/cython/message.pyx | 381 +++++++++++++++++++ zmq/backend/cython/rebuffer.pyx | 104 ++++++ zmq/backend/cython/socket.pxd | 47 +++ zmq/backend/cython/socket.pyx | 672 ++++++++++++++++++++++++++++++++++ zmq/backend/cython/utils.pxd | 29 ++ zmq/backend/cython/utils.pyx | 119 ++++++ zmq/backend/select.py | 39 ++ 33 files changed, 3582 insertions(+) create mode 100644 zmq/backend/__init__.py create mode 100644 zmq/backend/cffi/__init__.py create mode 100644 zmq/backend/cffi/_cdefs.h create mode 100644 zmq/backend/cffi/_cffi.py create mode 100644 zmq/backend/cffi/_poll.py create mode 100644 zmq/backend/cffi/_verify.c create mode 100644 zmq/backend/cffi/constants.py create mode 100644 zmq/backend/cffi/context.py create mode 100644 zmq/backend/cffi/devices.py create mode 100644 zmq/backend/cffi/error.py create mode 100644 zmq/backend/cffi/message.py create mode 100644 zmq/backend/cffi/socket.py create mode 100644 zmq/backend/cffi/utils.py create mode 100644 zmq/backend/cython/__init__.py create mode 100644 zmq/backend/cython/_device.pyx create mode 100644 zmq/backend/cython/_poll.pyx create mode 100644 zmq/backend/cython/_version.pyx create mode 100644 zmq/backend/cython/checkrc.pxd create mode 100644 zmq/backend/cython/constant_enums.pxi create mode 100644 zmq/backend/cython/constants.pxi create mode 100644 zmq/backend/cython/constants.pyx create mode 100644 zmq/backend/cython/context.pxd create mode 100644 zmq/backend/cython/context.pyx create mode 100644 zmq/backend/cython/error.pyx create mode 100644 zmq/backend/cython/libzmq.pxd create mode 100644 zmq/backend/cython/message.pxd create mode 100644 zmq/backend/cython/message.pyx create mode 100644 zmq/backend/cython/rebuffer.pyx create mode 100644 zmq/backend/cython/socket.pxd create mode 100644 zmq/backend/cython/socket.pyx create mode 100644 zmq/backend/cython/utils.pxd create mode 100644 zmq/backend/cython/utils.pyx create mode 100644 zmq/backend/select.py (limited to 'zmq/backend') diff --git a/zmq/backend/__init__.py b/zmq/backend/__init__.py new file mode 100644 index 0000000..7cac725 --- /dev/null +++ b/zmq/backend/__init__.py @@ -0,0 +1,45 @@ +"""Import basic exposure of libzmq C API as a backend""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + + +import os +import platform +import sys + +from zmq.utils.sixcerpt import reraise + +from .select import public_api, select_backend + +if 'PYZMQ_BACKEND' in os.environ: + backend = os.environ['PYZMQ_BACKEND'] + if backend in ('cython', 'cffi'): + backend = 'zmq.backend.%s' % backend + _ns = select_backend(backend) +else: + # default to cython, fallback to cffi + # (reverse on PyPy) + if platform.python_implementation() == 'PyPy': + first, second = ('zmq.backend.cffi', 'zmq.backend.cython') + else: + first, second = ('zmq.backend.cython', 'zmq.backend.cffi') + + try: + _ns = select_backend(first) + except Exception: + exc_info = sys.exc_info() + exc = exc_info[1] + try: + _ns = select_backend(second) + except ImportError: + # prevent 'During handling of the above exception...' on py3 + # can't use `raise ... from` on Python 2 + if hasattr(exc, '__cause__'): + exc.__cause__ = None + # raise the *first* error, not the fallback + reraise(*exc_info) + +globals().update(_ns) + +__all__ = public_api diff --git a/zmq/backend/cffi/__init__.py b/zmq/backend/cffi/__init__.py new file mode 100644 index 0000000..ca3164d --- /dev/null +++ b/zmq/backend/cffi/__init__.py @@ -0,0 +1,22 @@ +"""CFFI backend (for PyPY)""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +from zmq.backend.cffi import (constants, error, message, context, socket, + _poll, devices, utils) + +__all__ = [] +for submod in (constants, error, message, context, socket, + _poll, devices, utils): + __all__.extend(submod.__all__) + +from .constants import * +from .error import * +from .message import * +from .context import * +from .socket import * +from .devices import * +from ._poll import * +from ._cffi import zmq_version_info, ffi +from .utils import * diff --git a/zmq/backend/cffi/_cdefs.h b/zmq/backend/cffi/_cdefs.h new file mode 100644 index 0000000..d330057 --- /dev/null +++ b/zmq/backend/cffi/_cdefs.h @@ -0,0 +1,68 @@ +void zmq_version(int *major, int *minor, int *patch); + +void* zmq_socket(void *context, int type); +int zmq_close(void *socket); + +int zmq_bind(void *socket, const char *endpoint); +int zmq_connect(void *socket, const char *endpoint); + +int zmq_errno(void); +const char * zmq_strerror(int errnum); + +void* zmq_stopwatch_start(void); +unsigned long zmq_stopwatch_stop(void *watch); +void zmq_sleep(int seconds_); +int zmq_device(int device, void *frontend, void *backend); + +int zmq_unbind(void *socket, const char *endpoint); +int zmq_disconnect(void *socket, const char *endpoint); +void* zmq_ctx_new(); +int zmq_ctx_destroy(void *context); +int zmq_ctx_get(void *context, int opt); +int zmq_ctx_set(void *context, int opt, int optval); +int zmq_proxy(void *frontend, void *backend, void *capture); +int zmq_socket_monitor(void *socket, const char *addr, int events); + +int zmq_curve_keypair (char *z85_public_key, char *z85_secret_key); +int zmq_has (const char *capability); + +typedef struct { ...; } zmq_msg_t; +typedef ... zmq_free_fn; + +int zmq_msg_init(zmq_msg_t *msg); +int zmq_msg_init_size(zmq_msg_t *msg, size_t size); +int zmq_msg_init_data(zmq_msg_t *msg, + void *data, + size_t size, + zmq_free_fn *ffn, + void *hint); + +size_t zmq_msg_size(zmq_msg_t *msg); +void *zmq_msg_data(zmq_msg_t *msg); +int zmq_msg_close(zmq_msg_t *msg); + +int zmq_msg_send(zmq_msg_t *msg, void *socket, int flags); +int zmq_msg_recv(zmq_msg_t *msg, void *socket, int flags); + +int zmq_getsockopt(void *socket, + int option_name, + void *option_value, + size_t *option_len); + +int zmq_setsockopt(void *socket, + int option_name, + const void *option_value, + size_t option_len); +typedef struct +{ + void *socket; + int fd; + short events; + short revents; +} zmq_pollitem_t; + +int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout); + +// miscellany +void * memcpy(void *restrict s1, const void *restrict s2, size_t n); +int get_ipc_path_max_len(void); diff --git a/zmq/backend/cffi/_cffi.py b/zmq/backend/cffi/_cffi.py new file mode 100644 index 0000000..c73ebf8 --- /dev/null +++ b/zmq/backend/cffi/_cffi.py @@ -0,0 +1,127 @@ +# coding: utf-8 +"""The main CFFI wrapping of libzmq""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + + +import json +import os +from os.path import dirname, join +from cffi import FFI + +from zmq.utils.constant_names import all_names, no_prefix + + +base_zmq_version = (3,2,2) + +def load_compiler_config(): + """load pyzmq compiler arguments""" + import zmq + zmq_dir = dirname(zmq.__file__) + zmq_parent = dirname(zmq_dir) + + fname = join(zmq_dir, 'utils', 'compiler.json') + if os.path.exists(fname): + with open(fname) as f: + cfg = json.load(f) + else: + cfg = {} + + cfg.setdefault("include_dirs", []) + cfg.setdefault("library_dirs", []) + cfg.setdefault("runtime_library_dirs", []) + cfg.setdefault("libraries", ["zmq"]) + + # cast to str, because cffi can't handle unicode paths (?!) + cfg['libraries'] = [str(lib) for lib in cfg['libraries']] + for key in ("include_dirs", "library_dirs", "runtime_library_dirs"): + # interpret paths relative to parent of zmq (like source tree) + abs_paths = [] + for p in cfg[key]: + if p.startswith('zmq'): + p = join(zmq_parent, p) + abs_paths.append(str(p)) + cfg[key] = abs_paths + return cfg + + +def zmq_version_info(): + """Get libzmq version as tuple of ints""" + major = ffi.new('int*') + minor = ffi.new('int*') + patch = ffi.new('int*') + + C.zmq_version(major, minor, patch) + + return (int(major[0]), int(minor[0]), int(patch[0])) + + +cfg = load_compiler_config() +ffi = FFI() + +def _make_defines(names): + _names = [] + for name in names: + define_line = "#define %s ..." % (name) + _names.append(define_line) + + return "\n".join(_names) + +c_constant_names = [] +for name in all_names: + if no_prefix(name): + c_constant_names.append(name) + else: + c_constant_names.append("ZMQ_" + name) + +# load ffi definitions +here = os.path.dirname(__file__) +with open(os.path.join(here, '_cdefs.h')) as f: + _cdefs = f.read() + +with open(os.path.join(here, '_verify.c')) as f: + _verify = f.read() + +ffi.cdef(_cdefs) +ffi.cdef(_make_defines(c_constant_names)) + +try: + C = ffi.verify(_verify, + modulename='_cffi_ext', + libraries=cfg['libraries'], + include_dirs=cfg['include_dirs'], + library_dirs=cfg['library_dirs'], + runtime_library_dirs=cfg['runtime_library_dirs'], + ) + _version_info = zmq_version_info() +except Exception as e: + raise ImportError("PyZMQ CFFI backend couldn't find zeromq: %s\n" + "Please check that you have zeromq headers and libraries." % e) + +if _version_info < (3,2,2): + raise ImportError("PyZMQ CFFI backend requires zeromq >= 3.2.2," + " but found %i.%i.%i" % _version_info + ) + +nsp = new_sizet_pointer = lambda length: ffi.new('size_t*', length) + +new_uint64_pointer = lambda: (ffi.new('uint64_t*'), + nsp(ffi.sizeof('uint64_t'))) +new_int64_pointer = lambda: (ffi.new('int64_t*'), + nsp(ffi.sizeof('int64_t'))) +new_int_pointer = lambda: (ffi.new('int*'), + nsp(ffi.sizeof('int'))) +new_binary_data = lambda length: (ffi.new('char[%d]' % (length)), + nsp(ffi.sizeof('char') * length)) + +value_uint64_pointer = lambda val : (ffi.new('uint64_t*', val), + ffi.sizeof('uint64_t')) +value_int64_pointer = lambda val: (ffi.new('int64_t*', val), + ffi.sizeof('int64_t')) +value_int_pointer = lambda val: (ffi.new('int*', val), + ffi.sizeof('int')) +value_binary_data = lambda val, length: (ffi.new('char[%d]' % (length + 1), val), + ffi.sizeof('char') * length) + +IPC_PATH_MAX_LEN = C.get_ipc_path_max_len() diff --git a/zmq/backend/cffi/_poll.py b/zmq/backend/cffi/_poll.py new file mode 100644 index 0000000..9bca34c --- /dev/null +++ b/zmq/backend/cffi/_poll.py @@ -0,0 +1,56 @@ +# coding: utf-8 +"""zmq poll function""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +from ._cffi import C, ffi, zmq_version_info + +from .constants import * + +from zmq.error import _check_rc + + +def _make_zmq_pollitem(socket, flags): + zmq_socket = socket._zmq_socket + zmq_pollitem = ffi.new('zmq_pollitem_t*') + zmq_pollitem.socket = zmq_socket + zmq_pollitem.fd = 0 + zmq_pollitem.events = flags + zmq_pollitem.revents = 0 + return zmq_pollitem[0] + +def _make_zmq_pollitem_fromfd(socket_fd, flags): + zmq_pollitem = ffi.new('zmq_pollitem_t*') + zmq_pollitem.socket = ffi.NULL + zmq_pollitem.fd = socket_fd + zmq_pollitem.events = flags + zmq_pollitem.revents = 0 + return zmq_pollitem[0] + +def zmq_poll(sockets, timeout): + cffi_pollitem_list = [] + low_level_to_socket_obj = {} + for item in sockets: + if isinstance(item[0], int): + low_level_to_socket_obj[item[0]] = item + cffi_pollitem_list.append(_make_zmq_pollitem_fromfd(item[0], item[1])) + else: + low_level_to_socket_obj[item[0]._zmq_socket] = item + cffi_pollitem_list.append(_make_zmq_pollitem(item[0], item[1])) + items = ffi.new('zmq_pollitem_t[]', cffi_pollitem_list) + list_length = ffi.cast('int', len(cffi_pollitem_list)) + c_timeout = ffi.cast('long', timeout) + rc = C.zmq_poll(items, list_length, c_timeout) + _check_rc(rc) + result = [] + for index in range(len(items)): + if not items[index].socket == ffi.NULL: + if items[index].revents > 0: + result.append((low_level_to_socket_obj[items[index].socket][0], + items[index].revents)) + else: + result.append((items[index].fd, items[index].revents)) + return result + +__all__ = ['zmq_poll'] diff --git a/zmq/backend/cffi/_verify.c b/zmq/backend/cffi/_verify.c new file mode 100644 index 0000000..547840e --- /dev/null +++ b/zmq/backend/cffi/_verify.c @@ -0,0 +1,12 @@ +#include +#include +#include + +#include +#include +#include "zmq_compat.h" + +int get_ipc_path_max_len(void) { + struct sockaddr_un *dummy; + return sizeof(dummy->sun_path) - 1; +} diff --git a/zmq/backend/cffi/constants.py b/zmq/backend/cffi/constants.py new file mode 100644 index 0000000..ee293e7 --- /dev/null +++ b/zmq/backend/cffi/constants.py @@ -0,0 +1,15 @@ +# coding: utf-8 +"""zmq constants""" + +from ._cffi import C, c_constant_names +from zmq.utils.constant_names import all_names + +g = globals() +for cname in c_constant_names: + if cname.startswith("ZMQ_"): + name = cname[4:] + else: + name = cname + g[name] = getattr(C, cname) + +__all__ = all_names diff --git a/zmq/backend/cffi/context.py b/zmq/backend/cffi/context.py new file mode 100644 index 0000000..16a7b25 --- /dev/null +++ b/zmq/backend/cffi/context.py @@ -0,0 +1,100 @@ +# coding: utf-8 +"""zmq Context class""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +import weakref + +from ._cffi import C, ffi + +from .socket import * +from .constants import * + +from zmq.error import ZMQError, _check_rc + +class Context(object): + _zmq_ctx = None + _iothreads = None + _closed = None + _sockets = None + _shadow = False + + def __init__(self, io_threads=1, shadow=None): + + if shadow: + self._zmq_ctx = ffi.cast("void *", shadow) + self._shadow = True + else: + self._shadow = False + if not io_threads >= 0: + raise ZMQError(EINVAL) + + self._zmq_ctx = C.zmq_ctx_new() + if self._zmq_ctx == ffi.NULL: + raise ZMQError(C.zmq_errno()) + if not shadow: + C.zmq_ctx_set(self._zmq_ctx, IO_THREADS, io_threads) + self._closed = False + self._sockets = set() + + @property + def underlying(self): + """The address of the underlying libzmq context""" + return int(ffi.cast('size_t', self._zmq_ctx)) + + @property + def closed(self): + return self._closed + + def _add_socket(self, socket): + ref = weakref.ref(socket) + self._sockets.add(ref) + return ref + + def _rm_socket(self, ref): + if ref in self._sockets: + self._sockets.remove(ref) + + def set(self, option, value): + """set a context option + + see zmq_ctx_set + """ + rc = C.zmq_ctx_set(self._zmq_ctx, option, value) + _check_rc(rc) + + def get(self, option): + """get context option + + see zmq_ctx_get + """ + rc = C.zmq_ctx_get(self._zmq_ctx, option) + _check_rc(rc) + return rc + + def term(self): + if self.closed: + return + + C.zmq_ctx_destroy(self._zmq_ctx) + + self._zmq_ctx = None + self._closed = True + + def destroy(self, linger=None): + if self.closed: + return + + sockets = self._sockets + self._sockets = set() + for s in sockets: + s = s() + if s and not s.closed: + if linger: + s.setsockopt(LINGER, linger) + s.close() + + self.term() + +__all__ = ['Context'] diff --git a/zmq/backend/cffi/devices.py b/zmq/backend/cffi/devices.py new file mode 100644 index 0000000..c7a514a --- /dev/null +++ b/zmq/backend/cffi/devices.py @@ -0,0 +1,24 @@ +# coding: utf-8 +"""zmq device functions""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +from ._cffi import C, ffi, zmq_version_info +from .socket import Socket +from zmq.error import ZMQError, _check_rc + +def device(device_type, frontend, backend): + rc = C.zmq_proxy(frontend._zmq_socket, backend._zmq_socket, ffi.NULL) + _check_rc(rc) + +def proxy(frontend, backend, capture=None): + if isinstance(capture, Socket): + capture = capture._zmq_socket + else: + capture = ffi.NULL + + rc = C.zmq_proxy(frontend._zmq_socket, backend._zmq_socket, capture) + _check_rc(rc) + +__all__ = ['device', 'proxy'] diff --git a/zmq/backend/cffi/error.py b/zmq/backend/cffi/error.py new file mode 100644 index 0000000..3bb64de --- /dev/null +++ b/zmq/backend/cffi/error.py @@ -0,0 +1,13 @@ +"""zmq error functions""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +from ._cffi import C, ffi + +def strerror(errno): + return ffi.string(C.zmq_strerror(errno)) + +zmq_errno = C.zmq_errno + +__all__ = ['strerror', 'zmq_errno'] diff --git a/zmq/backend/cffi/message.py b/zmq/backend/cffi/message.py new file mode 100644 index 0000000..c35decb --- /dev/null +++ b/zmq/backend/cffi/message.py @@ -0,0 +1,69 @@ +"""Dummy Frame object""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +from ._cffi import ffi, C + +import zmq +from zmq.utils.strtypes import unicode + +try: + view = memoryview +except NameError: + view = buffer + +_content = lambda x: x.tobytes() if type(x) == memoryview else x + +class Frame(object): + _data = None + tracker = None + closed = False + more = False + buffer = None + + + def __init__(self, data, track=False): + try: + view(data) + except TypeError: + raise + + self._data = data + + if isinstance(data, unicode): + raise TypeError("Unicode objects not allowed. Only: str/bytes, " + + "buffer interfaces.") + + self.more = False + self.tracker = None + self.closed = False + if track: + self.tracker = zmq.MessageTracker() + + self.buffer = view(self.bytes) + + @property + def bytes(self): + data = _content(self._data) + return data + + def __len__(self): + return len(self.bytes) + + def __eq__(self, other): + return self.bytes == _content(other) + + def __str__(self): + if str is unicode: + return self.bytes.decode() + else: + return self.bytes + + @property + def done(self): + return True + +Message = Frame + +__all__ = ['Frame', 'Message'] diff --git a/zmq/backend/cffi/socket.py b/zmq/backend/cffi/socket.py new file mode 100644 index 0000000..3c42773 --- /dev/null +++ b/zmq/backend/cffi/socket.py @@ -0,0 +1,244 @@ +# coding: utf-8 +"""zmq Socket class""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +import random +import codecs + +import errno as errno_mod + +from ._cffi import (C, ffi, new_uint64_pointer, new_int64_pointer, + new_int_pointer, new_binary_data, value_uint64_pointer, + value_int64_pointer, value_int_pointer, value_binary_data, + IPC_PATH_MAX_LEN) + +from .message import Frame +from .constants import * + +import zmq +from zmq.error import ZMQError, _check_rc, _check_version +from zmq.utils.strtypes import unicode + + +def new_pointer_from_opt(option, length=0): + from zmq.sugar.constants import ( + int64_sockopts, bytes_sockopts, + ) + if option in int64_sockopts: + return new_int64_pointer() + elif option in bytes_sockopts: + return new_binary_data(length) + else: + # default + return new_int_pointer() + +def value_from_opt_pointer(option, opt_pointer, length=0): + from zmq.sugar.constants import ( + int64_sockopts, bytes_sockopts, + ) + if option in int64_sockopts: + return int(opt_pointer[0]) + elif option in bytes_sockopts: + return ffi.buffer(opt_pointer, length)[:] + else: + return int(opt_pointer[0]) + +def initialize_opt_pointer(option, value, length=0): + from zmq.sugar.constants import ( + int64_sockopts, bytes_sockopts, + ) + if option in int64_sockopts: + return value_int64_pointer(value) + elif option in bytes_sockopts: + return value_binary_data(value, length) + else: + return value_int_pointer(value) + + +class Socket(object): + context = None + socket_type = None + _zmq_socket = None + _closed = None + _ref = None + _shadow = False + + def __init__(self, context=None, socket_type=None, shadow=None): + self.context = context + if shadow is not None: + self._zmq_socket = ffi.cast("void *", shadow) + self._shadow = True + else: + self._shadow = False + self._zmq_socket = C.zmq_socket(context._zmq_ctx, socket_type) + if self._zmq_socket == ffi.NULL: + raise ZMQError() + self._closed = False + if context: + self._ref = context._add_socket(self) + + @property + def underlying(self): + """The address of the underlying libzmq socket""" + return int(ffi.cast('size_t', self._zmq_socket)) + + @property + def closed(self): + return self._closed + + def close(self, linger=None): + rc = 0 + if not self._closed and hasattr(self, '_zmq_socket'): + if self._zmq_socket is not None: + rc = C.zmq_close(self._zmq_socket) + self._closed = True + if self.context: + self.context._rm_socket(self._ref) + return rc + + def bind(self, address): + if isinstance(address, unicode): + address = address.encode('utf8') + rc = C.zmq_bind(self._zmq_socket, address) + if rc < 0: + if IPC_PATH_MAX_LEN and C.zmq_errno() == errno_mod.ENAMETOOLONG: + # py3compat: address is bytes, but msg wants str + if str is unicode: + address = address.decode('utf-8', 'replace') + path = address.split('://', 1)[-1] + msg = ('ipc path "{0}" is longer than {1} ' + 'characters (sizeof(sockaddr_un.sun_path)).' + .format(path, IPC_PATH_MAX_LEN)) + raise ZMQError(C.zmq_errno(), msg=msg) + else: + _check_rc(rc) + + def unbind(self, address): + _check_version((3,2), "unbind") + if isinstance(address, unicode): + address = address.encode('utf8') + rc = C.zmq_unbind(self._zmq_socket, address) + _check_rc(rc) + + def connect(self, address): + if isinstance(address, unicode): + address = address.encode('utf8') + rc = C.zmq_connect(self._zmq_socket, address) + _check_rc(rc) + + def disconnect(self, address): + _check_version((3,2), "disconnect") + if isinstance(address, unicode): + address = address.encode('utf8') + rc = C.zmq_disconnect(self._zmq_socket, address) + _check_rc(rc) + + def set(self, option, value): + length = None + if isinstance(value, unicode): + raise TypeError("unicode not allowed, use bytes") + + if isinstance(value, bytes): + if option not in zmq.constants.bytes_sockopts: + raise TypeError("not a bytes sockopt: %s" % option) + length = len(value) + + c_data = initialize_opt_pointer(option, value, length) + + c_value_pointer = c_data[0] + c_sizet = c_data[1] + + rc = C.zmq_setsockopt(self._zmq_socket, + option, + ffi.cast('void*', c_value_pointer), + c_sizet) + _check_rc(rc) + + def get(self, option): + c_data = new_pointer_from_opt(option, length=255) + + c_value_pointer = c_data[0] + c_sizet_pointer = c_data[1] + + rc = C.zmq_getsockopt(self._zmq_socket, + option, + c_value_pointer, + c_sizet_pointer) + _check_rc(rc) + + sz = c_sizet_pointer[0] + v = value_from_opt_pointer(option, c_value_pointer, sz) + if option != zmq.IDENTITY and option in zmq.constants.bytes_sockopts and v.endswith(b'\0'): + v = v[:-1] + return v + + def send(self, message, flags=0, copy=False, track=False): + if isinstance(message, unicode): + raise TypeError("Message must be in bytes, not an unicode Object") + + if isinstance(message, Frame): + message = message.bytes + + zmq_msg = ffi.new('zmq_msg_t*') + c_message = ffi.new('char[]', message) + rc = C.zmq_msg_init_size(zmq_msg, len(message)) + C.memcpy(C.zmq_msg_data(zmq_msg), c_message, len(message)) + + rc = C.zmq_msg_send(zmq_msg, self._zmq_socket, flags) + C.zmq_msg_close(zmq_msg) + _check_rc(rc) + + if track: + return zmq.MessageTracker() + + def recv(self, flags=0, copy=True, track=False): + zmq_msg = ffi.new('zmq_msg_t*') + C.zmq_msg_init(zmq_msg) + + rc = C.zmq_msg_recv(zmq_msg, self._zmq_socket, flags) + + if rc < 0: + C.zmq_msg_close(zmq_msg) + _check_rc(rc) + + _buffer = ffi.buffer(C.zmq_msg_data(zmq_msg), C.zmq_msg_size(zmq_msg)) + value = _buffer[:] + C.zmq_msg_close(zmq_msg) + + frame = Frame(value, track=track) + frame.more = self.getsockopt(RCVMORE) + + if copy: + return frame.bytes + else: + return frame + + def monitor(self, addr, events=-1): + """s.monitor(addr, flags) + + Start publishing socket events on inproc. + See libzmq docs for zmq_monitor for details. + + Note: requires libzmq >= 3.2 + + Parameters + ---------- + addr : str + The inproc url used for monitoring. Passing None as + the addr will cause an existing socket monitor to be + deregistered. + events : int [default: zmq.EVENT_ALL] + The zmq event bitmask for which events will be sent to the monitor. + """ + + _check_version((3,2), "monitor") + if events < 0: + events = zmq.EVENT_ALL + if addr is None: + addr = ffi.NULL + rc = C.zmq_socket_monitor(self._zmq_socket, addr, events) + + +__all__ = ['Socket', 'IPC_PATH_MAX_LEN'] diff --git a/zmq/backend/cffi/utils.py b/zmq/backend/cffi/utils.py new file mode 100644 index 0000000..fde7827 --- /dev/null +++ b/zmq/backend/cffi/utils.py @@ -0,0 +1,62 @@ +# coding: utf-8 +"""miscellaneous zmq_utils wrapping""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +from ._cffi import ffi, C + +from zmq.error import ZMQError, _check_rc, _check_version +from zmq.utils.strtypes import unicode + +def has(capability): + """Check for zmq capability by name (e.g. 'ipc', 'curve') + + .. versionadded:: libzmq-4.1 + .. versionadded:: 14.1 + """ + _check_version((4,1), 'zmq.has') + if isinstance(capability, unicode): + capability = capability.encode('utf8') + return bool(C.zmq_has(capability)) + +def curve_keypair(): + """generate a Z85 keypair for use with zmq.CURVE security + + Requires libzmq (≥ 4.0) to have been linked with libsodium. + + Returns + ------- + (public, secret) : two bytestrings + The public and private keypair as 40 byte z85-encoded bytestrings. + """ + _check_version((3,2), "monitor") + public = ffi.new('char[64]') + private = ffi.new('char[64]') + rc = C.zmq_curve_keypair(public, private) + _check_rc(rc) + return ffi.buffer(public)[:40], ffi.buffer(private)[:40] + + +class Stopwatch(object): + def __init__(self): + self.watch = ffi.NULL + + def start(self): + if self.watch == ffi.NULL: + self.watch = C.zmq_stopwatch_start() + else: + raise ZMQError('Stopwatch is already runing.') + + def stop(self): + if self.watch == ffi.NULL: + raise ZMQError('Must start the Stopwatch before calling stop.') + else: + time = C.zmq_stopwatch_stop(self.watch) + self.watch = ffi.NULL + return time + + def sleep(self, seconds): + C.zmq_sleep(seconds) + +__all__ = ['has', 'curve_keypair', 'Stopwatch'] diff --git a/zmq/backend/cython/__init__.py b/zmq/backend/cython/__init__.py new file mode 100644 index 0000000..e535818 --- /dev/null +++ b/zmq/backend/cython/__init__.py @@ -0,0 +1,23 @@ +"""Python bindings for core 0MQ objects.""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Lesser GNU Public License (LGPL). + +from . import (constants, error, message, context, + socket, utils, _poll, _version, _device ) + +__all__ = [] +for submod in (constants, error, message, context, + socket, utils, _poll, _version, _device): + __all__.extend(submod.__all__) + +from .constants import * +from .error import * +from .message import * +from .context import * +from .socket import * +from ._poll import * +from .utils import * +from ._device import * +from ._version import * + diff --git a/zmq/backend/cython/_device.pyx b/zmq/backend/cython/_device.pyx new file mode 100644 index 0000000..eea0a00 --- /dev/null +++ b/zmq/backend/cython/_device.pyx @@ -0,0 +1,89 @@ +"""Python binding for 0MQ device function.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from libzmq cimport zmq_device, zmq_proxy, ZMQ_VERSION_MAJOR +from zmq.backend.cython.socket cimport Socket as cSocket +from zmq.backend.cython.checkrc cimport _check_rc + +#----------------------------------------------------------------------------- +# Basic device API +#----------------------------------------------------------------------------- + +def device(int device_type, cSocket frontend, cSocket backend=None): + """device(device_type, frontend, backend) + + Start a zeromq device. + + .. deprecated:: libzmq-3.2 + Use zmq.proxy + + Parameters + ---------- + device_type : (QUEUE, FORWARDER, STREAMER) + The type of device to start. + frontend : Socket + The Socket instance for the incoming traffic. + backend : Socket + The Socket instance for the outbound traffic. + """ + if ZMQ_VERSION_MAJOR >= 3: + return proxy(frontend, backend) + + cdef int rc = 0 + with nogil: + rc = zmq_device(device_type, frontend.handle, backend.handle) + _check_rc(rc) + return rc + +def proxy(cSocket frontend, cSocket backend, cSocket capture=None): + """proxy(frontend, backend, capture) + + Start a zeromq proxy (replacement for device). + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + Parameters + ---------- + frontend : Socket + The Socket instance for the incoming traffic. + backend : Socket + The Socket instance for the outbound traffic. + capture : Socket (optional) + The Socket instance for capturing traffic. + """ + cdef int rc = 0 + cdef void* capture_handle + if isinstance(capture, cSocket): + capture_handle = capture.handle + else: + capture_handle = NULL + with nogil: + rc = zmq_proxy(frontend.handle, backend.handle, capture_handle) + _check_rc(rc) + return rc + +__all__ = ['device', 'proxy'] + diff --git a/zmq/backend/cython/_poll.pyx b/zmq/backend/cython/_poll.pyx new file mode 100644 index 0000000..5bed46b --- /dev/null +++ b/zmq/backend/cython/_poll.pyx @@ -0,0 +1,137 @@ +"""0MQ polling related functions and classes.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from libc.stdlib cimport free, malloc + +from libzmq cimport zmq_pollitem_t, ZMQ_VERSION_MAJOR +from libzmq cimport zmq_poll as zmq_poll_c +from socket cimport Socket + +import sys + +from zmq.backend.cython.checkrc cimport _check_rc + +#----------------------------------------------------------------------------- +# Polling related methods +#----------------------------------------------------------------------------- + +# version-independent typecheck for int/long +if sys.version_info[0] >= 3: + int_t = int +else: + int_t = (int,long) + + +def zmq_poll(sockets, long timeout=-1): + """zmq_poll(sockets, timeout=-1) + + Poll a set of 0MQ sockets, native file descs. or sockets. + + Parameters + ---------- + sockets : list of tuples of (socket, flags) + Each element of this list is a two-tuple containing a socket + and a flags. The socket may be a 0MQ socket or any object with + a ``fileno()`` method. The flags can be zmq.POLLIN (for detecting + for incoming messages), zmq.POLLOUT (for detecting that send is OK) + or zmq.POLLIN|zmq.POLLOUT for detecting both. + timeout : int + The number of milliseconds to poll for. Negative means no timeout. + """ + cdef int rc, i + cdef zmq_pollitem_t *pollitems = NULL + cdef int nsockets = len(sockets) + cdef Socket current_socket + + if nsockets == 0: + return [] + + pollitems = malloc(nsockets*sizeof(zmq_pollitem_t)) + if pollitems == NULL: + raise MemoryError("Could not allocate poll items") + + if ZMQ_VERSION_MAJOR < 3: + # timeout is us in 2.x, ms in 3.x + # expected input is ms (matches 3.x) + timeout = 1000*timeout + + for i in range(nsockets): + s, events = sockets[i] + if isinstance(s, Socket): + pollitems[i].socket = (s).handle + pollitems[i].events = events + pollitems[i].revents = 0 + elif isinstance(s, int_t): + pollitems[i].socket = NULL + pollitems[i].fd = s + pollitems[i].events = events + pollitems[i].revents = 0 + elif hasattr(s, 'fileno'): + try: + fileno = int(s.fileno()) + except: + free(pollitems) + raise ValueError('fileno() must return a valid integer fd') + else: + pollitems[i].socket = NULL + pollitems[i].fd = fileno + pollitems[i].events = events + pollitems[i].revents = 0 + else: + free(pollitems) + raise TypeError( + "Socket must be a 0MQ socket, an integer fd or have " + "a fileno() method: %r" % s + ) + + + with nogil: + rc = zmq_poll_c(pollitems, nsockets, timeout) + + if rc < 0: + free(pollitems) + _check_rc(rc) + + results = [] + for i in range(nsockets): + revents = pollitems[i].revents + # for compatibility with select.poll: + # - only return sockets with non-zero status + # - return the fd for plain sockets + if revents > 0: + if pollitems[i].socket != NULL: + s = sockets[i][0] + else: + s = pollitems[i].fd + results.append((s, revents)) + + free(pollitems) + return results + +#----------------------------------------------------------------------------- +# Symbols to export +#----------------------------------------------------------------------------- + +__all__ = [ 'zmq_poll' ] diff --git a/zmq/backend/cython/_version.pyx b/zmq/backend/cython/_version.pyx new file mode 100644 index 0000000..02cf6fc --- /dev/null +++ b/zmq/backend/cython/_version.pyx @@ -0,0 +1,43 @@ +"""PyZMQ and 0MQ version functions.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from libzmq cimport _zmq_version + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +def zmq_version_info(): + """zmq_version_info() + + Return the version of ZeroMQ itself as a 3-tuple of ints. + """ + cdef int major, minor, patch + _zmq_version(&major, &minor, &patch) + return (major, minor, patch) + + +__all__ = ['zmq_version_info'] + diff --git a/zmq/backend/cython/checkrc.pxd b/zmq/backend/cython/checkrc.pxd new file mode 100644 index 0000000..3bf69fc --- /dev/null +++ b/zmq/backend/cython/checkrc.pxd @@ -0,0 +1,23 @@ +from libc.errno cimport EINTR, EAGAIN +from cpython cimport PyErr_CheckSignals +from libzmq cimport zmq_errno, ZMQ_ETERM + +cdef inline int _check_rc(int rc) except -1: + """internal utility for checking zmq return condition + + and raising the appropriate Exception class + """ + cdef int errno = zmq_errno() + PyErr_CheckSignals() + if rc < 0: + if errno == EAGAIN: + from zmq.error import Again + raise Again(errno) + elif errno == ZMQ_ETERM: + from zmq.error import ContextTerminated + raise ContextTerminated(errno) + else: + from zmq.error import ZMQError + raise ZMQError(errno) + # return -1 + return 0 diff --git a/zmq/backend/cython/constant_enums.pxi b/zmq/backend/cython/constant_enums.pxi new file mode 100644 index 0000000..3d0efd9 --- /dev/null +++ b/zmq/backend/cython/constant_enums.pxi @@ -0,0 +1,156 @@ +cdef extern from "zmq.h" nogil: + + enum: ZMQ_VERSION + enum: ZMQ_VERSION_MAJOR + enum: ZMQ_VERSION_MINOR + enum: ZMQ_VERSION_PATCH + enum: ZMQ_NOBLOCK + enum: ZMQ_DONTWAIT + enum: ZMQ_POLLIN + enum: ZMQ_POLLOUT + enum: ZMQ_POLLERR + enum: ZMQ_SNDMORE + enum: ZMQ_STREAMER + enum: ZMQ_FORWARDER + enum: ZMQ_QUEUE + enum: ZMQ_IO_THREADS_DFLT + enum: ZMQ_MAX_SOCKETS_DFLT + enum: ZMQ_POLLITEMS_DFLT + enum: ZMQ_THREAD_PRIORITY_DFLT + enum: ZMQ_THREAD_SCHED_POLICY_DFLT + enum: ZMQ_PAIR + enum: ZMQ_PUB + enum: ZMQ_SUB + enum: ZMQ_REQ + enum: ZMQ_REP + enum: ZMQ_DEALER + enum: ZMQ_ROUTER + enum: ZMQ_XREQ + enum: ZMQ_XREP + enum: ZMQ_PULL + enum: ZMQ_PUSH + enum: ZMQ_XPUB + enum: ZMQ_XSUB + enum: ZMQ_UPSTREAM + enum: ZMQ_DOWNSTREAM + enum: ZMQ_STREAM + enum: ZMQ_EVENT_CONNECTED + enum: ZMQ_EVENT_CONNECT_DELAYED + enum: ZMQ_EVENT_CONNECT_RETRIED + enum: ZMQ_EVENT_LISTENING + enum: ZMQ_EVENT_BIND_FAILED + enum: ZMQ_EVENT_ACCEPTED + enum: ZMQ_EVENT_ACCEPT_FAILED + enum: ZMQ_EVENT_CLOSED + enum: ZMQ_EVENT_CLOSE_FAILED + enum: ZMQ_EVENT_DISCONNECTED + enum: ZMQ_EVENT_ALL + enum: ZMQ_EVENT_MONITOR_STOPPED + enum: ZMQ_NULL + enum: ZMQ_PLAIN + enum: ZMQ_CURVE + enum: ZMQ_GSSAPI + enum: ZMQ_EAGAIN "EAGAIN" + enum: ZMQ_EINVAL "EINVAL" + enum: ZMQ_EFAULT "EFAULT" + enum: ZMQ_ENOMEM "ENOMEM" + enum: ZMQ_ENODEV "ENODEV" + enum: ZMQ_EMSGSIZE "EMSGSIZE" + enum: ZMQ_EAFNOSUPPORT "EAFNOSUPPORT" + enum: ZMQ_ENETUNREACH "ENETUNREACH" + enum: ZMQ_ECONNABORTED "ECONNABORTED" + enum: ZMQ_ECONNRESET "ECONNRESET" + enum: ZMQ_ENOTCONN "ENOTCONN" + enum: ZMQ_ETIMEDOUT "ETIMEDOUT" + enum: ZMQ_EHOSTUNREACH "EHOSTUNREACH" + enum: ZMQ_ENETRESET "ENETRESET" + enum: ZMQ_HAUSNUMERO + enum: ZMQ_ENOTSUP "ENOTSUP" + enum: ZMQ_EPROTONOSUPPORT "EPROTONOSUPPORT" + enum: ZMQ_ENOBUFS "ENOBUFS" + enum: ZMQ_ENETDOWN "ENETDOWN" + enum: ZMQ_EADDRINUSE "EADDRINUSE" + enum: ZMQ_EADDRNOTAVAIL "EADDRNOTAVAIL" + enum: ZMQ_ECONNREFUSED "ECONNREFUSED" + enum: ZMQ_EINPROGRESS "EINPROGRESS" + enum: ZMQ_ENOTSOCK "ENOTSOCK" + enum: ZMQ_EFSM "EFSM" + enum: ZMQ_ENOCOMPATPROTO "ENOCOMPATPROTO" + enum: ZMQ_ETERM "ETERM" + enum: ZMQ_EMTHREAD "EMTHREAD" + enum: ZMQ_IO_THREADS + enum: ZMQ_MAX_SOCKETS + enum: ZMQ_SOCKET_LIMIT + enum: ZMQ_THREAD_PRIORITY + enum: ZMQ_THREAD_SCHED_POLICY + enum: ZMQ_IDENTITY + enum: ZMQ_SUBSCRIBE + enum: ZMQ_UNSUBSCRIBE + enum: ZMQ_LAST_ENDPOINT + enum: ZMQ_TCP_ACCEPT_FILTER + enum: ZMQ_PLAIN_USERNAME + enum: ZMQ_PLAIN_PASSWORD + enum: ZMQ_CURVE_PUBLICKEY + enum: ZMQ_CURVE_SECRETKEY + enum: ZMQ_CURVE_SERVERKEY + enum: ZMQ_ZAP_DOMAIN + enum: ZMQ_CONNECT_RID + enum: ZMQ_GSSAPI_PRINCIPAL + enum: ZMQ_GSSAPI_SERVICE_PRINCIPAL + enum: ZMQ_SOCKS_PROXY + enum: ZMQ_FD + enum: ZMQ_IDENTITY_FD + enum: ZMQ_RECONNECT_IVL_MAX + enum: ZMQ_SNDTIMEO + enum: ZMQ_RCVTIMEO + enum: ZMQ_SNDHWM + enum: ZMQ_RCVHWM + enum: ZMQ_MULTICAST_HOPS + enum: ZMQ_IPV4ONLY + enum: ZMQ_ROUTER_BEHAVIOR + enum: ZMQ_TCP_KEEPALIVE + enum: ZMQ_TCP_KEEPALIVE_CNT + enum: ZMQ_TCP_KEEPALIVE_IDLE + enum: ZMQ_TCP_KEEPALIVE_INTVL + enum: ZMQ_DELAY_ATTACH_ON_CONNECT + enum: ZMQ_XPUB_VERBOSE + enum: ZMQ_EVENTS + enum: ZMQ_TYPE + enum: ZMQ_LINGER + enum: ZMQ_RECONNECT_IVL + enum: ZMQ_BACKLOG + enum: ZMQ_ROUTER_MANDATORY + enum: ZMQ_FAIL_UNROUTABLE + enum: ZMQ_ROUTER_RAW + enum: ZMQ_IMMEDIATE + enum: ZMQ_IPV6 + enum: ZMQ_MECHANISM + enum: ZMQ_PLAIN_SERVER + enum: ZMQ_CURVE_SERVER + enum: ZMQ_PROBE_ROUTER + enum: ZMQ_REQ_RELAXED + enum: ZMQ_REQ_CORRELATE + enum: ZMQ_CONFLATE + enum: ZMQ_ROUTER_HANDOVER + enum: ZMQ_TOS + enum: ZMQ_IPC_FILTER_PID + enum: ZMQ_IPC_FILTER_UID + enum: ZMQ_IPC_FILTER_GID + enum: ZMQ_GSSAPI_SERVER + enum: ZMQ_GSSAPI_PLAINTEXT + enum: ZMQ_HANDSHAKE_IVL + enum: ZMQ_XPUB_NODROP + enum: ZMQ_AFFINITY + enum: ZMQ_MAXMSGSIZE + enum: ZMQ_HWM + enum: ZMQ_SWAP + enum: ZMQ_MCAST_LOOP + enum: ZMQ_RECOVERY_IVL_MSEC + enum: ZMQ_RATE + enum: ZMQ_RECOVERY_IVL + enum: ZMQ_SNDBUF + enum: ZMQ_RCVBUF + enum: ZMQ_RCVMORE + enum: ZMQ_MORE + enum: ZMQ_SRCFD + enum: ZMQ_SHARED diff --git a/zmq/backend/cython/constants.pxi b/zmq/backend/cython/constants.pxi new file mode 100644 index 0000000..606e6cb --- /dev/null +++ b/zmq/backend/cython/constants.pxi @@ -0,0 +1,318 @@ +#----------------------------------------------------------------------------- +# Python module level constants +#----------------------------------------------------------------------------- + +VERSION = ZMQ_VERSION +VERSION_MAJOR = ZMQ_VERSION_MAJOR +VERSION_MINOR = ZMQ_VERSION_MINOR +VERSION_PATCH = ZMQ_VERSION_PATCH +NOBLOCK = ZMQ_NOBLOCK +DONTWAIT = ZMQ_DONTWAIT +POLLIN = ZMQ_POLLIN +POLLOUT = ZMQ_POLLOUT +POLLERR = ZMQ_POLLERR +SNDMORE = ZMQ_SNDMORE +STREAMER = ZMQ_STREAMER +FORWARDER = ZMQ_FORWARDER +QUEUE = ZMQ_QUEUE +IO_THREADS_DFLT = ZMQ_IO_THREADS_DFLT +MAX_SOCKETS_DFLT = ZMQ_MAX_SOCKETS_DFLT +POLLITEMS_DFLT = ZMQ_POLLITEMS_DFLT +THREAD_PRIORITY_DFLT = ZMQ_THREAD_PRIORITY_DFLT +THREAD_SCHED_POLICY_DFLT = ZMQ_THREAD_SCHED_POLICY_DFLT +PAIR = ZMQ_PAIR +PUB = ZMQ_PUB +SUB = ZMQ_SUB +REQ = ZMQ_REQ +REP = ZMQ_REP +DEALER = ZMQ_DEALER +ROUTER = ZMQ_ROUTER +XREQ = ZMQ_XREQ +XREP = ZMQ_XREP +PULL = ZMQ_PULL +PUSH = ZMQ_PUSH +XPUB = ZMQ_XPUB +XSUB = ZMQ_XSUB +UPSTREAM = ZMQ_UPSTREAM +DOWNSTREAM = ZMQ_DOWNSTREAM +STREAM = ZMQ_STREAM +EVENT_CONNECTED = ZMQ_EVENT_CONNECTED +EVENT_CONNECT_DELAYED = ZMQ_EVENT_CONNECT_DELAYED +EVENT_CONNECT_RETRIED = ZMQ_EVENT_CONNECT_RETRIED +EVENT_LISTENING = ZMQ_EVENT_LISTENING +EVENT_BIND_FAILED = ZMQ_EVENT_BIND_FAILED +EVENT_ACCEPTED = ZMQ_EVENT_ACCEPTED +EVENT_ACCEPT_FAILED = ZMQ_EVENT_ACCEPT_FAILED +EVENT_CLOSED = ZMQ_EVENT_CLOSED +EVENT_CLOSE_FAILED = ZMQ_EVENT_CLOSE_FAILED +EVENT_DISCONNECTED = ZMQ_EVENT_DISCONNECTED +EVENT_ALL = ZMQ_EVENT_ALL +EVENT_MONITOR_STOPPED = ZMQ_EVENT_MONITOR_STOPPED +globals()['NULL'] = ZMQ_NULL +PLAIN = ZMQ_PLAIN +CURVE = ZMQ_CURVE +GSSAPI = ZMQ_GSSAPI +EAGAIN = ZMQ_EAGAIN +EINVAL = ZMQ_EINVAL +EFAULT = ZMQ_EFAULT +ENOMEM = ZMQ_ENOMEM +ENODEV = ZMQ_ENODEV +EMSGSIZE = ZMQ_EMSGSIZE +EAFNOSUPPORT = ZMQ_EAFNOSUPPORT +ENETUNREACH = ZMQ_ENETUNREACH +ECONNABORTED = ZMQ_ECONNABORTED +ECONNRESET = ZMQ_ECONNRESET +ENOTCONN = ZMQ_ENOTCONN +ETIMEDOUT = ZMQ_ETIMEDOUT +EHOSTUNREACH = ZMQ_EHOSTUNREACH +ENETRESET = ZMQ_ENETRESET +HAUSNUMERO = ZMQ_HAUSNUMERO +ENOTSUP = ZMQ_ENOTSUP +EPROTONOSUPPORT = ZMQ_EPROTONOSUPPORT +ENOBUFS = ZMQ_ENOBUFS +ENETDOWN = ZMQ_ENETDOWN +EADDRINUSE = ZMQ_EADDRINUSE +EADDRNOTAVAIL = ZMQ_EADDRNOTAVAIL +ECONNREFUSED = ZMQ_ECONNREFUSED +EINPROGRESS = ZMQ_EINPROGRESS +ENOTSOCK = ZMQ_ENOTSOCK +EFSM = ZMQ_EFSM +ENOCOMPATPROTO = ZMQ_ENOCOMPATPROTO +ETERM = ZMQ_ETERM +EMTHREAD = ZMQ_EMTHREAD +IO_THREADS = ZMQ_IO_THREADS +MAX_SOCKETS = ZMQ_MAX_SOCKETS +SOCKET_LIMIT = ZMQ_SOCKET_LIMIT +THREAD_PRIORITY = ZMQ_THREAD_PRIORITY +THREAD_SCHED_POLICY = ZMQ_THREAD_SCHED_POLICY +IDENTITY = ZMQ_IDENTITY +SUBSCRIBE = ZMQ_SUBSCRIBE +UNSUBSCRIBE = ZMQ_UNSUBSCRIBE +LAST_ENDPOINT = ZMQ_LAST_ENDPOINT +TCP_ACCEPT_FILTER = ZMQ_TCP_ACCEPT_FILTER +PLAIN_USERNAME = ZMQ_PLAIN_USERNAME +PLAIN_PASSWORD = ZMQ_PLAIN_PASSWORD +CURVE_PUBLICKEY = ZMQ_CURVE_PUBLICKEY +CURVE_SECRETKEY = ZMQ_CURVE_SECRETKEY +CURVE_SERVERKEY = ZMQ_CURVE_SERVERKEY +ZAP_DOMAIN = ZMQ_ZAP_DOMAIN +CONNECT_RID = ZMQ_CONNECT_RID +GSSAPI_PRINCIPAL = ZMQ_GSSAPI_PRINCIPAL +GSSAPI_SERVICE_PRINCIPAL = ZMQ_GSSAPI_SERVICE_PRINCIPAL +SOCKS_PROXY = ZMQ_SOCKS_PROXY +FD = ZMQ_FD +IDENTITY_FD = ZMQ_IDENTITY_FD +RECONNECT_IVL_MAX = ZMQ_RECONNECT_IVL_MAX +SNDTIMEO = ZMQ_SNDTIMEO +RCVTIMEO = ZMQ_RCVTIMEO +SNDHWM = ZMQ_SNDHWM +RCVHWM = ZMQ_RCVHWM +MULTICAST_HOPS = ZMQ_MULTICAST_HOPS +IPV4ONLY = ZMQ_IPV4ONLY +ROUTER_BEHAVIOR = ZMQ_ROUTER_BEHAVIOR +TCP_KEEPALIVE = ZMQ_TCP_KEEPALIVE +TCP_KEEPALIVE_CNT = ZMQ_TCP_KEEPALIVE_CNT +TCP_KEEPALIVE_IDLE = ZMQ_TCP_KEEPALIVE_IDLE +TCP_KEEPALIVE_INTVL = ZMQ_TCP_KEEPALIVE_INTVL +DELAY_ATTACH_ON_CONNECT = ZMQ_DELAY_ATTACH_ON_CONNECT +XPUB_VERBOSE = ZMQ_XPUB_VERBOSE +EVENTS = ZMQ_EVENTS +TYPE = ZMQ_TYPE +LINGER = ZMQ_LINGER +RECONNECT_IVL = ZMQ_RECONNECT_IVL +BACKLOG = ZMQ_BACKLOG +ROUTER_MANDATORY = ZMQ_ROUTER_MANDATORY +FAIL_UNROUTABLE = ZMQ_FAIL_UNROUTABLE +ROUTER_RAW = ZMQ_ROUTER_RAW +IMMEDIATE = ZMQ_IMMEDIATE +IPV6 = ZMQ_IPV6 +MECHANISM = ZMQ_MECHANISM +PLAIN_SERVER = ZMQ_PLAIN_SERVER +CURVE_SERVER = ZMQ_CURVE_SERVER +PROBE_ROUTER = ZMQ_PROBE_ROUTER +REQ_RELAXED = ZMQ_REQ_RELAXED +REQ_CORRELATE = ZMQ_REQ_CORRELATE +CONFLATE = ZMQ_CONFLATE +ROUTER_HANDOVER = ZMQ_ROUTER_HANDOVER +TOS = ZMQ_TOS +IPC_FILTER_PID = ZMQ_IPC_FILTER_PID +IPC_FILTER_UID = ZMQ_IPC_FILTER_UID +IPC_FILTER_GID = ZMQ_IPC_FILTER_GID +GSSAPI_SERVER = ZMQ_GSSAPI_SERVER +GSSAPI_PLAINTEXT = ZMQ_GSSAPI_PLAINTEXT +HANDSHAKE_IVL = ZMQ_HANDSHAKE_IVL +XPUB_NODROP = ZMQ_XPUB_NODROP +AFFINITY = ZMQ_AFFINITY +MAXMSGSIZE = ZMQ_MAXMSGSIZE +HWM = ZMQ_HWM +SWAP = ZMQ_SWAP +MCAST_LOOP = ZMQ_MCAST_LOOP +RECOVERY_IVL_MSEC = ZMQ_RECOVERY_IVL_MSEC +RATE = ZMQ_RATE +RECOVERY_IVL = ZMQ_RECOVERY_IVL +SNDBUF = ZMQ_SNDBUF +RCVBUF = ZMQ_RCVBUF +RCVMORE = ZMQ_RCVMORE +MORE = ZMQ_MORE +SRCFD = ZMQ_SRCFD +SHARED = ZMQ_SHARED + +#----------------------------------------------------------------------------- +# Symbols to export +#----------------------------------------------------------------------------- +__all__ = [ + "VERSION", + "VERSION_MAJOR", + "VERSION_MINOR", + "VERSION_PATCH", + "NOBLOCK", + "DONTWAIT", + "POLLIN", + "POLLOUT", + "POLLERR", + "SNDMORE", + "STREAMER", + "FORWARDER", + "QUEUE", + "IO_THREADS_DFLT", + "MAX_SOCKETS_DFLT", + "POLLITEMS_DFLT", + "THREAD_PRIORITY_DFLT", + "THREAD_SCHED_POLICY_DFLT", + "PAIR", + "PUB", + "SUB", + "REQ", + "REP", + "DEALER", + "ROUTER", + "XREQ", + "XREP", + "PULL", + "PUSH", + "XPUB", + "XSUB", + "UPSTREAM", + "DOWNSTREAM", + "STREAM", + "EVENT_CONNECTED", + "EVENT_CONNECT_DELAYED", + "EVENT_CONNECT_RETRIED", + "EVENT_LISTENING", + "EVENT_BIND_FAILED", + "EVENT_ACCEPTED", + "EVENT_ACCEPT_FAILED", + "EVENT_CLOSED", + "EVENT_CLOSE_FAILED", + "EVENT_DISCONNECTED", + "EVENT_ALL", + "EVENT_MONITOR_STOPPED", + "NULL", + "PLAIN", + "CURVE", + "GSSAPI", + "EAGAIN", + "EINVAL", + "EFAULT", + "ENOMEM", + "ENODEV", + "EMSGSIZE", + "EAFNOSUPPORT", + "ENETUNREACH", + "ECONNABORTED", + "ECONNRESET", + "ENOTCONN", + "ETIMEDOUT", + "EHOSTUNREACH", + "ENETRESET", + "HAUSNUMERO", + "ENOTSUP", + "EPROTONOSUPPORT", + "ENOBUFS", + "ENETDOWN", + "EADDRINUSE", + "EADDRNOTAVAIL", + "ECONNREFUSED", + "EINPROGRESS", + "ENOTSOCK", + "EFSM", + "ENOCOMPATPROTO", + "ETERM", + "EMTHREAD", + "IO_THREADS", + "MAX_SOCKETS", + "SOCKET_LIMIT", + "THREAD_PRIORITY", + "THREAD_SCHED_POLICY", + "IDENTITY", + "SUBSCRIBE", + "UNSUBSCRIBE", + "LAST_ENDPOINT", + "TCP_ACCEPT_FILTER", + "PLAIN_USERNAME", + "PLAIN_PASSWORD", + "CURVE_PUBLICKEY", + "CURVE_SECRETKEY", + "CURVE_SERVERKEY", + "ZAP_DOMAIN", + "CONNECT_RID", + "GSSAPI_PRINCIPAL", + "GSSAPI_SERVICE_PRINCIPAL", + "SOCKS_PROXY", + "FD", + "IDENTITY_FD", + "RECONNECT_IVL_MAX", + "SNDTIMEO", + "RCVTIMEO", + "SNDHWM", + "RCVHWM", + "MULTICAST_HOPS", + "IPV4ONLY", + "ROUTER_BEHAVIOR", + "TCP_KEEPALIVE", + "TCP_KEEPALIVE_CNT", + "TCP_KEEPALIVE_IDLE", + "TCP_KEEPALIVE_INTVL", + "DELAY_ATTACH_ON_CONNECT", + "XPUB_VERBOSE", + "EVENTS", + "TYPE", + "LINGER", + "RECONNECT_IVL", + "BACKLOG", + "ROUTER_MANDATORY", + "FAIL_UNROUTABLE", + "ROUTER_RAW", + "IMMEDIATE", + "IPV6", + "MECHANISM", + "PLAIN_SERVER", + "CURVE_SERVER", + "PROBE_ROUTER", + "REQ_RELAXED", + "REQ_CORRELATE", + "CONFLATE", + "ROUTER_HANDOVER", + "TOS", + "IPC_FILTER_PID", + "IPC_FILTER_UID", + "IPC_FILTER_GID", + "GSSAPI_SERVER", + "GSSAPI_PLAINTEXT", + "HANDSHAKE_IVL", + "XPUB_NODROP", + "AFFINITY", + "MAXMSGSIZE", + "HWM", + "SWAP", + "MCAST_LOOP", + "RECOVERY_IVL_MSEC", + "RATE", + "RECOVERY_IVL", + "SNDBUF", + "RCVBUF", + "RCVMORE", + "MORE", + "SRCFD", + "SHARED", +] diff --git a/zmq/backend/cython/constants.pyx b/zmq/backend/cython/constants.pyx new file mode 100644 index 0000000..f924f03 --- /dev/null +++ b/zmq/backend/cython/constants.pyx @@ -0,0 +1,32 @@ +"""0MQ Constants.""" + +# +# Copyright (c) 2010 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from libzmq cimport * + +#----------------------------------------------------------------------------- +# Python module level constants +#----------------------------------------------------------------------------- + +include "constants.pxi" diff --git a/zmq/backend/cython/context.pxd b/zmq/backend/cython/context.pxd new file mode 100644 index 0000000..9c9267a --- /dev/null +++ b/zmq/backend/cython/context.pxd @@ -0,0 +1,41 @@ +"""0MQ Context class declaration.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +cdef class Context: + + cdef object __weakref__ # enable weakref + cdef void *handle # The C handle for the underlying zmq object. + cdef bint _shadow # whether the Context is a shadow wrapper of another + cdef void **_sockets # A C-array containg socket handles + cdef size_t _n_sockets # the number of sockets + cdef size_t _max_sockets # the size of the _sockets array + cdef int _pid # the pid of the process which created me (for fork safety) + + cdef public bint closed # bool property for a closed context. + cdef inline int _term(self) + # helpers for events on _sockets in Socket.__cinit__()/close() + cdef inline void _add_socket(self, void* handle) + cdef inline void _remove_socket(self, void* handle) + diff --git a/zmq/backend/cython/context.pyx b/zmq/backend/cython/context.pyx new file mode 100644 index 0000000..b527e5d --- /dev/null +++ b/zmq/backend/cython/context.pyx @@ -0,0 +1,243 @@ +"""0MQ Context class.""" +# coding: utf-8 + +# Copyright (c) PyZMQ Developers. +# Distributed under the terms of the Lesser GNU Public License (LGPL). + +from libc.stdlib cimport free, malloc, realloc + +from libzmq cimport * + +cdef extern from "getpid_compat.h": + int getpid() + +from zmq.error import ZMQError +from zmq.backend.cython.checkrc cimport _check_rc + + +_instance = None + +cdef class Context: + """Context(io_threads=1) + + Manage the lifecycle of a 0MQ context. + + Parameters + ---------- + io_threads : int + The number of IO threads. + """ + + # no-op for the signature + def __init__(self, io_threads=1, shadow=0): + pass + + def __cinit__(self, int io_threads=1, size_t shadow=0, **kwargs): + self.handle = NULL + self._sockets = NULL + if shadow: + self.handle = shadow + self._shadow = True + else: + self._shadow = False + if ZMQ_VERSION_MAJOR >= 3: + self.handle = zmq_ctx_new() + else: + self.handle = zmq_init(io_threads) + + if self.handle == NULL: + raise ZMQError() + + cdef int rc = 0 + if ZMQ_VERSION_MAJOR >= 3 and not self._shadow: + rc = zmq_ctx_set(self.handle, ZMQ_IO_THREADS, io_threads) + _check_rc(rc) + + self.closed = False + self._n_sockets = 0 + self._max_sockets = 32 + + self._sockets = malloc(self._max_sockets*sizeof(void *)) + if self._sockets == NULL: + raise MemoryError("Could not allocate _sockets array") + + self._pid = getpid() + + def __dealloc__(self): + """don't touch members in dealloc, just cleanup allocations""" + cdef int rc + if self._sockets != NULL: + free(self._sockets) + self._sockets = NULL + self._n_sockets = 0 + + # we can't call object methods in dealloc as it + # might already be partially deleted + if not self._shadow: + self._term() + + cdef inline void _add_socket(self, void* handle): + """Add a socket handle to be closed when Context terminates. + + This is to be called in the Socket constructor. + """ + if self._n_sockets >= self._max_sockets: + self._max_sockets *= 2 + self._sockets = realloc(self._sockets, self._max_sockets*sizeof(void *)) + if self._sockets == NULL: + raise MemoryError("Could not reallocate _sockets array") + + self._sockets[self._n_sockets] = handle + self._n_sockets += 1 + + cdef inline void _remove_socket(self, void* handle): + """Remove a socket from the collected handles. + + This should be called by Socket.close, to prevent trying to + close a socket a second time. + """ + cdef bint found = False + + for idx in range(self._n_sockets): + if self._sockets[idx] == handle: + found=True + break + + if found: + self._n_sockets -= 1 + if self._n_sockets: + # move last handle to closed socket's index + self._sockets[idx] = self._sockets[self._n_sockets] + + + @property + def underlying(self): + """The address of the underlying libzmq context""" + return self.handle + + # backward-compat, though nobody is using it + _handle = underlying + + cdef inline int _term(self): + cdef int rc=0 + if self.handle != NULL and not self.closed and getpid() == self._pid: + with nogil: + rc = zmq_ctx_destroy(self.handle) + self.handle = NULL + return rc + + def term(self): + """ctx.term() + + Close or terminate the context. + + This can be called to close the context by hand. If this is not called, + the context will automatically be closed when it is garbage collected. + """ + cdef int rc + rc = self._term() + self.closed = True + + def set(self, int option, optval): + """ctx.set(option, optval) + + Set a context option. + + See the 0MQ API documentation for zmq_ctx_set + for details on specific options. + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + Parameters + ---------- + option : int + The option to set. Available values will depend on your + version of libzmq. Examples include:: + + zmq.IO_THREADS, zmq.MAX_SOCKETS + + optval : int + The value of the option to set. + """ + cdef int optval_int_c + cdef int rc + cdef char* optval_c + + if self.closed: + raise RuntimeError("Context has been destroyed") + + if not isinstance(optval, int): + raise TypeError('expected int, got: %r' % optval) + optval_int_c = optval + rc = zmq_ctx_set(self.handle, option, optval_int_c) + _check_rc(rc) + + def get(self, int option): + """ctx.get(option) + + Get the value of a context option. + + See the 0MQ API documentation for zmq_ctx_get + for details on specific options. + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + Parameters + ---------- + option : int + The option to get. Available values will depend on your + version of libzmq. Examples include:: + + zmq.IO_THREADS, zmq.MAX_SOCKETS + + Returns + ------- + optval : int + The value of the option as an integer. + """ + cdef int optval_int_c + cdef size_t sz + cdef int rc + + if self.closed: + raise RuntimeError("Context has been destroyed") + + rc = zmq_ctx_get(self.handle, option) + _check_rc(rc) + + return rc + + def destroy(self, linger=None): + """ctx.destroy(linger=None) + + Close all sockets associated with this context, and then terminate + the context. If linger is specified, + the LINGER sockopt of the sockets will be set prior to closing. + + .. warning:: + + destroy involves calling ``zmq_close()``, which is **NOT** threadsafe. + If there are active sockets in other threads, this must not be called. + """ + + cdef int linger_c + cdef bint setlinger=False + + if linger is not None: + linger_c = linger + setlinger=True + + if self.handle != NULL and not self.closed and self._n_sockets: + while self._n_sockets: + if setlinger: + zmq_setsockopt(self._sockets[0], ZMQ_LINGER, &linger_c, sizeof(int)) + rc = zmq_close(self._sockets[0]) + if rc < 0 and zmq_errno() != ZMQ_ENOTSOCK: + raise ZMQError() + self._n_sockets -= 1 + self._sockets[0] = self._sockets[self._n_sockets] + self.term() + +__all__ = ['Context'] diff --git a/zmq/backend/cython/error.pyx b/zmq/backend/cython/error.pyx new file mode 100644 index 0000000..85e785f --- /dev/null +++ b/zmq/backend/cython/error.pyx @@ -0,0 +1,56 @@ +"""0MQ Error classes and functions.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +# allow const char* +cdef extern from *: + ctypedef char* const_char_ptr "const char*" + +from libzmq cimport zmq_strerror, zmq_errno as zmq_errno_c + +from zmq.utils.strtypes import bytes + +def strerror(int errno): + """strerror(errno) + + Return the error string given the error number. + """ + cdef const_char_ptr str_e + # char * will be a bytes object: + str_e = zmq_strerror(errno) + if str is bytes: + # Python 2: str is bytes, so we already have the right type + return str_e + else: + # Python 3: decode bytes to unicode str + return str_e.decode() + +def zmq_errno(): + """zmq_errno() + + Return the integer errno of the most recent zmq error. + """ + return zmq_errno_c() + +__all__ = ['strerror', 'zmq_errno'] diff --git a/zmq/backend/cython/libzmq.pxd b/zmq/backend/cython/libzmq.pxd new file mode 100644 index 0000000..e42f6d6 --- /dev/null +++ b/zmq/backend/cython/libzmq.pxd @@ -0,0 +1,110 @@ +"""All the C imports for 0MQ""" + +# +# Copyright (c) 2010 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Import the C header files +#----------------------------------------------------------------------------- + +cdef extern from *: + ctypedef void* const_void_ptr "const void *" + ctypedef char* const_char_ptr "const char *" + +cdef extern from "zmq_compat.h": + ctypedef signed long long int64_t "pyzmq_int64_t" + +include "constant_enums.pxi" + +cdef extern from "zmq.h" nogil: + + void _zmq_version "zmq_version"(int *major, int *minor, int *patch) + + ctypedef int fd_t "ZMQ_FD_T" + + enum: errno + char *zmq_strerror (int errnum) + int zmq_errno() + + void *zmq_ctx_new () + int zmq_ctx_destroy (void *context) + int zmq_ctx_set (void *context, int option, int optval) + int zmq_ctx_get (void *context, int option) + void *zmq_init (int io_threads) + int zmq_term (void *context) + + # blackbox def for zmq_msg_t + ctypedef void * zmq_msg_t "zmq_msg_t" + + ctypedef void zmq_free_fn(void *data, void *hint) + + int zmq_msg_init (zmq_msg_t *msg) + int zmq_msg_init_size (zmq_msg_t *msg, size_t size) + int zmq_msg_init_data (zmq_msg_t *msg, void *data, + size_t size, zmq_free_fn *ffn, void *hint) + int zmq_msg_send (zmq_msg_t *msg, void *s, int flags) + int zmq_msg_recv (zmq_msg_t *msg, void *s, int flags) + int zmq_msg_close (zmq_msg_t *msg) + int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src) + int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src) + void *zmq_msg_data (zmq_msg_t *msg) + size_t zmq_msg_size (zmq_msg_t *msg) + int zmq_msg_more (zmq_msg_t *msg) + int zmq_msg_get (zmq_msg_t *msg, int option) + int zmq_msg_set (zmq_msg_t *msg, int option, int optval) + const_char_ptr zmq_msg_gets (zmq_msg_t *msg, const_char_ptr property) + int zmq_has (const_char_ptr capability) + + void *zmq_socket (void *context, int type) + int zmq_close (void *s) + int zmq_setsockopt (void *s, int option, void *optval, size_t optvallen) + int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen) + int zmq_bind (void *s, char *addr) + int zmq_connect (void *s, char *addr) + int zmq_unbind (void *s, char *addr) + int zmq_disconnect (void *s, char *addr) + + int zmq_socket_monitor (void *s, char *addr, int flags) + + # send/recv + int zmq_sendbuf (void *s, const_void_ptr buf, size_t n, int flags) + int zmq_recvbuf (void *s, void *buf, size_t n, int flags) + + ctypedef struct zmq_pollitem_t: + void *socket + int fd + short events + short revents + + int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout) + + int zmq_device (int device_, void *insocket_, void *outsocket_) + int zmq_proxy (void *frontend, void *backend, void *capture) + +cdef extern from "zmq_utils.h" nogil: + + void *zmq_stopwatch_start () + unsigned long zmq_stopwatch_stop (void *watch_) + void zmq_sleep (int seconds_) + int zmq_curve_keypair (char *z85_public_key, char *z85_secret_key) + diff --git a/zmq/backend/cython/message.pxd b/zmq/backend/cython/message.pxd new file mode 100644 index 0000000..4781195 --- /dev/null +++ b/zmq/backend/cython/message.pxd @@ -0,0 +1,63 @@ +"""0MQ Message related class declarations.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from cpython cimport PyBytes_FromStringAndSize + +from libzmq cimport zmq_msg_t, zmq_msg_data, zmq_msg_size + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +cdef class MessageTracker(object): + + cdef set events # Message Event objects to track. + cdef set peers # Other Message or MessageTracker objects. + + +cdef class Frame: + + cdef zmq_msg_t zmq_msg + cdef object _data # The actual message data as a Python object. + cdef object _buffer # A Python Buffer/View of the message contents + cdef object _bytes # A bytes/str copy of the message. + cdef bint _failed_init # Flag to handle failed zmq_msg_init + cdef public object tracker_event # Event for use with zmq_free_fn. + cdef public object tracker # MessageTracker object. + cdef public bint more # whether RCVMORE was set + + cdef Frame fast_copy(self) # Create shallow copy of Message object. + cdef object _getbuffer(self) # Construct self._buffer. + + +cdef inline object copy_zmq_msg_bytes(zmq_msg_t *zmq_msg): + """ Copy the data from a zmq_msg_t """ + cdef char *data_c = NULL + cdef Py_ssize_t data_len_c + data_c = zmq_msg_data(zmq_msg) + data_len_c = zmq_msg_size(zmq_msg) + return PyBytes_FromStringAndSize(data_c, data_len_c) + + diff --git a/zmq/backend/cython/message.pyx b/zmq/backend/cython/message.pyx new file mode 100644 index 0000000..312ae12 --- /dev/null +++ b/zmq/backend/cython/message.pyx @@ -0,0 +1,381 @@ +"""0MQ Message related classes.""" + +# +# Copyright (c) 2013 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +# get version-independent aliases: +cdef extern from "pyversion_compat.h": + pass + +from cpython cimport Py_DECREF, Py_INCREF + +from buffers cimport asbuffer_r, viewfromobject_r + +cdef extern from "Python.h": + ctypedef int Py_ssize_t + +from libzmq cimport * + +from libc.stdio cimport fprintf, stderr as cstderr +from libc.stdlib cimport malloc, free +from libc.string cimport memcpy + +import time + +try: + # below 3.3 + from threading import _Event as Event +except (ImportError, AttributeError): + # python throws ImportError, cython throws AttributeError + from threading import Event + +import zmq +from zmq.error import _check_version +from zmq.backend.cython.checkrc cimport _check_rc +from zmq.utils.strtypes import bytes,unicode,basestring + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +ctypedef struct zhint: + void *ctx + size_t id + +cdef void free_python_msg(void *data, void *vhint) nogil: + """A pure-C function for DECREF'ing Python-owned message data. + + Sends a message on a PUSH socket + + The hint is a `zhint` struct with two values: + + ctx (void *): pointer to the Garbage Collector's context + id (size_t): the id to be used to construct a zmq_msg_t that should be sent on a PUSH socket, + signaling the Garbage Collector to remove its reference to the object. + + - A PUSH socket is created in the context, + - it is connected to the garbage collector inproc channel, + - it sends the gc message + - the PUSH socket is closed + + When the Garbage Collector's PULL socket receives the message, + it deletes its reference to the object, + allowing Python to free the memory. + """ + cdef void *push + cdef zmq_msg_t msg + cdef zhint *hint = vhint + if hint != NULL: + zmq_msg_init_size(&msg, sizeof(size_t)) + memcpy(zmq_msg_data(&msg), &hint.id, sizeof(size_t)) + + push = zmq_socket(hint.ctx, ZMQ_PUSH) + if push == NULL: + # this will happen if the context has been terminated + return + rc = zmq_connect(push, "inproc://pyzmq.gc.01") + if rc < 0: + fprintf(cstderr, "pyzmq-gc connect failed: %s\n", zmq_strerror(zmq_errno())) + return + + rc = zmq_msg_send(&msg, push, 0) + if rc < 0: + fprintf(cstderr, "pyzmq-gc send failed: %s\n", zmq_strerror(zmq_errno())) + + zmq_msg_close(&msg) + zmq_close(push) + free(hint) + +gc = None + +cdef class Frame: + """Frame(data=None, track=False) + + A zmq message Frame class for non-copy send/recvs. + + This class is only needed if you want to do non-copying send and recvs. + When you pass a string to this class, like ``Frame(s)``, the + ref-count of `s` is increased by two: once because the Frame saves `s` as + an instance attribute and another because a ZMQ message is created that + points to the buffer of `s`. This second ref-count increase makes sure + that `s` lives until all messages that use it have been sent. Once 0MQ + sends all the messages and it doesn't need the buffer of s, 0MQ will call + ``Py_DECREF(s)``. + + Parameters + ---------- + + data : object, optional + any object that provides the buffer interface will be used to + construct the 0MQ message data. + track : bool [default: False] + whether a MessageTracker_ should be created to track this object. + Tracking a message has a cost at creation, because it creates a threadsafe + Event object. + + """ + + def __cinit__(self, object data=None, track=False, **kwargs): + cdef int rc + cdef char *data_c = NULL + cdef Py_ssize_t data_len_c=0 + cdef zhint *hint + + # init more as False + self.more = False + + # Save the data object in case the user wants the the data as a str. + self._data = data + self._failed_init = True # bool switch for dealloc + self._buffer = None # buffer view of data + self._bytes = None # bytes copy of data + + # Event and MessageTracker for monitoring when zmq is done with data: + if track: + evt = Event() + self.tracker_event = evt + self.tracker = zmq.MessageTracker(evt) + else: + self.tracker_event = None + self.tracker = None + + if isinstance(data, unicode): + raise TypeError("Unicode objects not allowed. Only: str/bytes, buffer interfaces.") + + if data is None: + rc = zmq_msg_init(&self.zmq_msg) + _check_rc(rc) + self._failed_init = False + return + else: + asbuffer_r(data, &data_c, &data_len_c) + + # create the hint for zmq_free_fn + # two pointers: the gc context and a message to be sent to the gc PULL socket + # allows libzmq to signal to Python when it is done with Python-owned memory. + global gc + if gc is None: + from zmq.utils.garbage import gc + + hint = malloc(sizeof(zhint)) + hint.id = gc.store(data, self.tracker_event) + hint.ctx = gc._context.underlying + + rc = zmq_msg_init_data( + &self.zmq_msg, data_c, data_len_c, + free_python_msg, hint + ) + if rc != 0: + free(hint) + _check_rc(rc) + self._failed_init = False + + def __init__(self, object data=None, track=False): + """Enforce signature""" + pass + + def __dealloc__(self): + cdef int rc + if self._failed_init: + return + # This simply decreases the 0MQ ref-count of zmq_msg. + with nogil: + rc = zmq_msg_close(&self.zmq_msg) + _check_rc(rc) + + # buffer interface code adapted from petsc4py by Lisandro Dalcin, a BSD project + + def __getbuffer__(self, Py_buffer* buffer, int flags): + # new-style (memoryview) buffer interface + buffer.buf = zmq_msg_data(&self.zmq_msg) + buffer.len = zmq_msg_size(&self.zmq_msg) + + buffer.obj = self + buffer.readonly = 1 + buffer.format = "B" + buffer.ndim = 0 + buffer.shape = NULL + buffer.strides = NULL + buffer.suboffsets = NULL + buffer.itemsize = 1 + buffer.internal = NULL + + def __getsegcount__(self, Py_ssize_t *lenp): + # required for getreadbuffer + if lenp != NULL: + lenp[0] = zmq_msg_size(&self.zmq_msg) + return 1 + + def __getreadbuffer__(self, Py_ssize_t idx, void **p): + # old-style (buffer) interface + cdef char *data_c = NULL + cdef Py_ssize_t data_len_c + if idx != 0: + raise SystemError("accessing non-existent buffer segment") + # read-only, because we don't want to allow + # editing of the message in-place + data_c = zmq_msg_data(&self.zmq_msg) + data_len_c = zmq_msg_size(&self.zmq_msg) + if p != NULL: + p[0] = data_c + return data_len_c + + # end buffer interface + + def __copy__(self): + """Create a shallow copy of the message. + + This does not copy the contents of the Frame, just the pointer. + This will increment the 0MQ ref count of the message, but not + the ref count of the Python object. That is only done once when + the Python is first turned into a 0MQ message. + """ + return self.fast_copy() + + cdef Frame fast_copy(self): + """Fast, cdef'd version of shallow copy of the Frame.""" + cdef Frame new_msg + new_msg = Frame() + # This does not copy the contents, but just increases the ref-count + # of the zmq_msg by one. + zmq_msg_copy(&new_msg.zmq_msg, &self.zmq_msg) + # Copy the ref to data so the copy won't create a copy when str is + # called. + if self._data is not None: + new_msg._data = self._data + if self._buffer is not None: + new_msg._buffer = self._buffer + if self._bytes is not None: + new_msg._bytes = self._bytes + + # Frame copies share the tracker and tracker_event + new_msg.tracker_event = self.tracker_event + new_msg.tracker = self.tracker + + return new_msg + + def __len__(self): + """Return the length of the message in bytes.""" + cdef size_t sz + sz = zmq_msg_size(&self.zmq_msg) + return sz + # return zmq_msg_size(&self.zmq_msg) + + def __str__(self): + """Return the str form of the message.""" + if isinstance(self._data, bytes): + b = self._data + else: + b = self.bytes + if str is unicode: + return b.decode() + else: + return b + + cdef inline object _getbuffer(self): + """Create a Python buffer/view of the message data. + + This will be called only once, the first time the `buffer` property + is accessed. Subsequent calls use a cached copy. + """ + if self._data is None: + return viewfromobject_r(self) + else: + return viewfromobject_r(self._data) + + @property + def buffer(self): + """A read-only buffer view of the message contents.""" + if self._buffer is None: + self._buffer = self._getbuffer() + return self._buffer + + @property + def bytes(self): + """The message content as a Python bytes object. + + The first time this property is accessed, a copy of the message + contents is made. From then on that same copy of the message is + returned. + """ + if self._bytes is None: + self._bytes = copy_zmq_msg_bytes(&self.zmq_msg) + return self._bytes + + def set(self, int option, int value): + """Frame.set(option, value) + + Set a Frame option. + + See the 0MQ API documentation for zmq_msg_set + for details on specific options. + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + """ + cdef int rc = zmq_msg_set(&self.zmq_msg, option, value) + _check_rc(rc) + + def get(self, option): + """Frame.get(option) + + Get a Frame option or property. + + See the 0MQ API documentation for zmq_msg_get and zmq_msg_gets + for details on specific options. + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + .. versionchanged:: 14.3 + add support for zmq_msg_gets (requires libzmq-4.1) + """ + cdef int rc = 0 + cdef char *property_c = NULL + cdef Py_ssize_t property_len_c = 0 + + # zmq_msg_get + if isinstance(option, int): + rc = zmq_msg_get(&self.zmq_msg, option) + _check_rc(rc) + return rc + + # zmq_msg_gets + _check_version((4,1), "get string properties") + if isinstance(option, unicode): + option = option.encode('utf8') + + if not isinstance(option, bytes): + raise TypeError("expected str, got: %r" % option) + + property_c = option + + cdef const char *result = zmq_msg_gets(&self.zmq_msg, property_c) + if result == NULL: + _check_rc(-1) + return result.decode('utf8') + +# legacy Message name +Message = Frame + +__all__ = ['Frame', 'Message'] diff --git a/zmq/backend/cython/rebuffer.pyx b/zmq/backend/cython/rebuffer.pyx new file mode 100644 index 0000000..402e3b6 --- /dev/null +++ b/zmq/backend/cython/rebuffer.pyx @@ -0,0 +1,104 @@ +""" +Utility for changing itemsize of memoryviews, and getting +numpy arrays from byte-arrays that should be interpreted with a different +itemsize. + +Authors +------- +* MinRK +""" + +#----------------------------------------------------------------------------- +# Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley +# +# This file is part of pyzmq +# +# Distributed under the terms of the New BSD License. The full license is in +# the file COPYING.BSD, distributed as part of this software. +#----------------------------------------------------------------------------- + +from libc.stdlib cimport malloc +from buffers cimport * + +cdef inline object _rebuffer(object obj, char * format, int itemsize): + """clobber the format & itemsize of a 1-D + + This is the Python 3 model, but will work on Python >= 2.6. Currently, + we use it only on >= 3.0. + """ + cdef Py_buffer view + cdef int flags = PyBUF_SIMPLE + cdef int mode = 0 + # cdef Py_ssize_t *shape, *strides, *suboffsets + + mode = check_buffer(obj) + if mode == 0: + raise TypeError("%r does not provide a buffer interface."%obj) + + if mode == 3: + flags = PyBUF_ANY_CONTIGUOUS + if format: + flags |= PyBUF_FORMAT + PyObject_GetBuffer(obj, &view, flags) + assert view.ndim <= 1, "Can only reinterpret 1-D memoryviews" + assert view.len % itemsize == 0, "Buffer of length %i not divisible into items of size %i"%(view.len, itemsize) + # hack the format + view.ndim = 1 + view.format = format + view.itemsize = itemsize + view.strides = malloc(sizeof(Py_ssize_t)) + view.strides[0] = itemsize + view.shape = malloc(sizeof(Py_ssize_t)) + view.shape[0] = view.len/itemsize + view.suboffsets = malloc(sizeof(Py_ssize_t)) + view.suboffsets[0] = 0 + # for debug: make buffer writable, for zero-copy testing + # view.readonly = 0 + + return PyMemoryView_FromBuffer(&view) + else: + raise TypeError("This funciton is only for new-style buffer objects.") + +def rebuffer(obj, format, itemsize): + """Change the itemsize of a memoryview. + + Only for 1D contiguous buffers. + """ + return _rebuffer(obj, format, itemsize) + +def array_from_buffer(view, dtype, shape): + """Get a numpy array from a memoryview, regardless of the itemsize of the original + memoryview. This is important, because pyzmq does not send memoryview shape data + over the wire, so we need to change the memoryview itemsize before calling + asarray. + """ + import numpy + A = numpy.array([],dtype=dtype) + ref = viewfromobject(A,0) + fmt = ref.format.encode() + buf = viewfromobject(view, 0) + buf = _rebuffer(view, fmt, ref.itemsize) + return numpy.asarray(buf, dtype=dtype).reshape(shape) + +def print_view_info(obj): + """simple utility for printing info on a new-style buffer object""" + cdef Py_buffer view + cdef int flags = PyBUF_ANY_CONTIGUOUS|PyBUF_FORMAT + cdef int mode = 0 + + mode = check_buffer(obj) + if mode == 0: + raise TypeError("%r does not provide a buffer interface."%obj) + + if mode == 3: + PyObject_GetBuffer(obj, &view, flags) + print view.buf, view.len, view.format, view.ndim, + if view.ndim: + if view.shape: + print view.shape[0], + if view.strides: + print view.strides[0], + if view.suboffsets: + print view.suboffsets[0], + print + \ No newline at end of file diff --git a/zmq/backend/cython/socket.pxd b/zmq/backend/cython/socket.pxd new file mode 100644 index 0000000..b8a331e --- /dev/null +++ b/zmq/backend/cython/socket.pxd @@ -0,0 +1,47 @@ +"""0MQ Socket class declaration.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from context cimport Context + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + + +cdef class Socket: + + cdef object __weakref__ # enable weakref + cdef void *handle # The C handle for the underlying zmq object. + cdef bint _shadow # whether the Socket is a shadow wrapper of another + # Hold on to a reference to the context to make sure it is not garbage + # collected until the socket it done with it. + cdef public Context context # The zmq Context object that owns this. + cdef public bint _closed # bool property for a closed socket. + cdef int _pid # the pid of the process which created me (for fork safety) + + # cpdef methods for direct-cython access: + cpdef object send(self, object data, int flags=*, copy=*, track=*) + cpdef object recv(self, int flags=*, copy=*, track=*) + diff --git a/zmq/backend/cython/socket.pyx b/zmq/backend/cython/socket.pyx new file mode 100644 index 0000000..9b9ec36 --- /dev/null +++ b/zmq/backend/cython/socket.pyx @@ -0,0 +1,672 @@ +"""0MQ Socket class.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Cython Imports +#----------------------------------------------------------------------------- + +# get version-independent aliases: +cdef extern from "pyversion_compat.h": + pass + +from libc.errno cimport ENAMETOOLONG +from libc.string cimport memcpy + +from cpython cimport PyBytes_FromStringAndSize +from cpython cimport PyBytes_AsString, PyBytes_Size +from cpython cimport Py_DECREF, Py_INCREF + +from buffers cimport asbuffer_r, viewfromobject_r + +from libzmq cimport * +from message cimport Frame, copy_zmq_msg_bytes + +from context cimport Context + +cdef extern from "Python.h": + ctypedef int Py_ssize_t + +cdef extern from "ipcmaxlen.h": + int get_ipc_path_max_len() + +cdef extern from "getpid_compat.h": + int getpid() + + +#----------------------------------------------------------------------------- +# Python Imports +#----------------------------------------------------------------------------- + +import copy as copy_mod +import time +import sys +import random +import struct +import codecs + +from zmq.utils import jsonapi + +try: + import cPickle + pickle = cPickle +except: + cPickle = None + import pickle + +import zmq +from zmq.backend.cython import constants +from zmq.backend.cython.constants import * +from zmq.backend.cython.checkrc cimport _check_rc +from zmq.error import ZMQError, ZMQBindError, _check_version +from zmq.utils.strtypes import bytes,unicode,basestring + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +IPC_PATH_MAX_LEN = get_ipc_path_max_len() + +# inline some small socket submethods: +# true methods frequently cannot be inlined, acc. Cython docs + +cdef inline _check_closed(Socket s): + """raise ENOTSUP if socket is closed + + Does not do a deep check + """ + if s._closed: + raise ZMQError(ENOTSOCK) + +cdef inline _check_closed_deep(Socket s): + """thorough check of whether the socket has been closed, + even if by another entity (e.g. ctx.destroy). + + Only used by the `closed` property. + + returns True if closed, False otherwise + """ + cdef int rc + cdef int errno + cdef int stype + cdef size_t sz=sizeof(int) + if s._closed: + return True + else: + rc = zmq_getsockopt(s.handle, ZMQ_TYPE, &stype, &sz) + if rc < 0 and zmq_errno() == ENOTSOCK: + s._closed = True + return True + else: + _check_rc(rc) + return False + +cdef inline Frame _recv_frame(void *handle, int flags=0, track=False): + """Receive a message in a non-copying manner and return a Frame.""" + cdef int rc + msg = zmq.Frame(track=track) + cdef Frame cmsg = msg + + with nogil: + rc = zmq_msg_recv(&cmsg.zmq_msg, handle, flags) + + _check_rc(rc) + return msg + +cdef inline object _recv_copy(void *handle, int flags=0): + """Receive a message and return a copy""" + cdef zmq_msg_t zmq_msg + with nogil: + zmq_msg_init (&zmq_msg) + rc = zmq_msg_recv(&zmq_msg, handle, flags) + _check_rc(rc) + msg_bytes = copy_zmq_msg_bytes(&zmq_msg) + zmq_msg_close(&zmq_msg) + return msg_bytes + +cdef inline object _send_frame(void *handle, Frame msg, int flags=0): + """Send a Frame on this socket in a non-copy manner.""" + cdef int rc + cdef Frame msg_copy + + # Always copy so the original message isn't garbage collected. + # This doesn't do a real copy, just a reference. + msg_copy = msg.fast_copy() + + with nogil: + rc = zmq_msg_send(&msg_copy.zmq_msg, handle, flags) + + _check_rc(rc) + return msg.tracker + + +cdef inline object _send_copy(void *handle, object msg, int flags=0): + """Send a message on this socket by copying its content.""" + cdef int rc, rc2 + cdef zmq_msg_t data + cdef char *msg_c + cdef Py_ssize_t msg_c_len=0 + + # copy to c array: + asbuffer_r(msg, &msg_c, &msg_c_len) + + # Copy the msg before sending. This avoids any complications with + # the GIL, etc. + # If zmq_msg_init_* fails we must not call zmq_msg_close (Bus Error) + rc = zmq_msg_init_size(&data, msg_c_len) + + _check_rc(rc) + + with nogil: + memcpy(zmq_msg_data(&data), msg_c, zmq_msg_size(&data)) + rc = zmq_msg_send(&data, handle, flags) + rc2 = zmq_msg_close(&data) + _check_rc(rc) + _check_rc(rc2) + + +cdef class Socket: + """Socket(context, socket_type) + + A 0MQ socket. + + These objects will generally be constructed via the socket() method of a Context object. + + Note: 0MQ Sockets are *not* threadsafe. **DO NOT** share them across threads. + + Parameters + ---------- + context : Context + The 0MQ Context this Socket belongs to. + socket_type : int + The socket type, which can be any of the 0MQ socket types: + REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, XPUB, XSUB. + + See Also + -------- + .Context.socket : method for creating a socket bound to a Context. + """ + + # no-op for the signature + def __init__(self, context=None, socket_type=-1, shadow=0): + pass + + def __cinit__(self, Context context=None, int socket_type=-1, size_t shadow=0, *args, **kwargs): + cdef Py_ssize_t c_handle + + self.handle = NULL + self.context = context + if shadow: + self._shadow = True + self.handle = shadow + else: + if context is None: + raise TypeError("context must be specified") + if socket_type < 0: + raise TypeError("socket_type must be specified") + self._shadow = False + self.handle = zmq_socket(context.handle, socket_type) + if self.handle == NULL: + raise ZMQError() + self._closed = False + self._pid = getpid() + if context: + context._add_socket(self.handle) + + def __dealloc__(self): + """remove from context's list + + But be careful that context might not exist if called during gc + """ + if self.handle != NULL and not self._shadow and getpid() == self._pid: + # during gc, self.context might be NULL + if self.context and not self.context.closed: + self.context._remove_socket(self.handle) + + @property + def underlying(self): + """The address of the underlying libzmq socket""" + return self.handle + + @property + def closed(self): + return _check_closed_deep(self) + + def close(self, linger=None): + """s.close(linger=None) + + Close the socket. + + If linger is specified, LINGER sockopt will be set prior to closing. + + This can be called to close the socket by hand. If this is not + called, the socket will automatically be closed when it is + garbage collected. + """ + cdef int rc=0 + cdef int linger_c + cdef bint setlinger=False + + if linger is not None: + linger_c = linger + setlinger=True + + if self.handle != NULL and not self._closed and getpid() == self._pid: + if setlinger: + zmq_setsockopt(self.handle, ZMQ_LINGER, &linger_c, sizeof(int)) + rc = zmq_close(self.handle) + if rc != 0 and zmq_errno() != ENOTSOCK: + # ignore ENOTSOCK (closed by Context) + _check_rc(rc) + self._closed = True + # during gc, self.context might be NULL + if self.context: + self.context._remove_socket(self.handle) + self.handle = NULL + + def set(self, int option, optval): + """s.set(option, optval) + + Set socket options. + + See the 0MQ API documentation for details on specific options. + + Parameters + ---------- + option : int + The option to set. Available values will depend on your + version of libzmq. Examples include:: + + zmq.SUBSCRIBE, UNSUBSCRIBE, IDENTITY, HWM, LINGER, FD + + optval : int or bytes + The value of the option to set. + """ + cdef int64_t optval_int64_c + cdef int optval_int_c + cdef int rc + cdef char* optval_c + cdef Py_ssize_t sz + + _check_closed(self) + if isinstance(optval, unicode): + raise TypeError("unicode not allowed, use setsockopt_string") + + if option in zmq.constants.bytes_sockopts: + if not isinstance(optval, bytes): + raise TypeError('expected bytes, got: %r' % optval) + optval_c = PyBytes_AsString(optval) + sz = PyBytes_Size(optval) + rc = zmq_setsockopt( + self.handle, option, + optval_c, sz + ) + elif option in zmq.constants.int64_sockopts: + if not isinstance(optval, int): + raise TypeError('expected int, got: %r' % optval) + optval_int64_c = optval + rc = zmq_setsockopt( + self.handle, option, + &optval_int64_c, sizeof(int64_t) + ) + else: + # default is to assume int, which is what most new sockopts will be + # this lets pyzmq work with newer libzmq which may add constants + # pyzmq has not yet added, rather than artificially raising. Invalid + # sockopts will still raise just the same, but it will be libzmq doing + # the raising. + if not isinstance(optval, int): + raise TypeError('expected int, got: %r' % optval) + optval_int_c = optval + rc = zmq_setsockopt( + self.handle, option, + &optval_int_c, sizeof(int) + ) + + _check_rc(rc) + + def get(self, int option): + """s.get(option) + + Get the value of a socket option. + + See the 0MQ API documentation for details on specific options. + + Parameters + ---------- + option : int + The option to get. Available values will depend on your + version of libzmq. Examples include:: + + zmq.IDENTITY, HWM, LINGER, FD, EVENTS + + Returns + ------- + optval : int or bytes + The value of the option as a bytestring or int. + """ + cdef int64_t optval_int64_c + cdef int optval_int_c + cdef fd_t optval_fd_c + cdef char identity_str_c [255] + cdef size_t sz + cdef int rc + + _check_closed(self) + + if option in zmq.constants.bytes_sockopts: + sz = 255 + rc = zmq_getsockopt(self.handle, option, identity_str_c, &sz) + _check_rc(rc) + # strip null-terminated strings *except* identity + if option != ZMQ_IDENTITY and sz > 0 and (identity_str_c)[sz-1] == b'\0': + sz -= 1 + result = PyBytes_FromStringAndSize(identity_str_c, sz) + elif option in zmq.constants.int64_sockopts: + sz = sizeof(int64_t) + rc = zmq_getsockopt(self.handle, option, &optval_int64_c, &sz) + _check_rc(rc) + result = optval_int64_c + elif option in zmq.constants.fd_sockopts: + sz = sizeof(fd_t) + rc = zmq_getsockopt(self.handle, option, &optval_fd_c, &sz) + _check_rc(rc) + result = optval_fd_c + else: + # default is to assume int, which is what most new sockopts will be + # this lets pyzmq work with newer libzmq which may add constants + # pyzmq has not yet added, rather than artificially raising. Invalid + # sockopts will still raise just the same, but it will be libzmq doing + # the raising. + sz = sizeof(int) + rc = zmq_getsockopt(self.handle, option, &optval_int_c, &sz) + _check_rc(rc) + result = optval_int_c + + return result + + def bind(self, addr): + """s.bind(addr) + + Bind the socket to an address. + + This causes the socket to listen on a network port. Sockets on the + other side of this connection will use ``Socket.connect(addr)`` to + connect to this socket. + + Parameters + ---------- + addr : str + The address string. This has the form 'protocol://interface:port', + for example 'tcp://127.0.0.1:5555'. Protocols supported include + tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is + encoded to utf-8 first. + """ + cdef int rc + cdef char* c_addr + + _check_closed(self) + if isinstance(addr, unicode): + addr = addr.encode('utf-8') + if not isinstance(addr, bytes): + raise TypeError('expected str, got: %r' % addr) + c_addr = addr + rc = zmq_bind(self.handle, c_addr) + if rc != 0: + if IPC_PATH_MAX_LEN and zmq_errno() == ENAMETOOLONG: + # py3compat: addr is bytes, but msg wants str + if str is unicode: + addr = addr.decode('utf-8', 'replace') + path = addr.split('://', 1)[-1] + msg = ('ipc path "{0}" is longer than {1} ' + 'characters (sizeof(sockaddr_un.sun_path)). ' + 'zmq.IPC_PATH_MAX_LEN constant can be used ' + 'to check addr length (if it is defined).' + .format(path, IPC_PATH_MAX_LEN)) + raise ZMQError(msg=msg) + _check_rc(rc) + + def connect(self, addr): + """s.connect(addr) + + Connect to a remote 0MQ socket. + + Parameters + ---------- + addr : str + The address string. This has the form 'protocol://interface:port', + for example 'tcp://127.0.0.1:5555'. Protocols supported are + tcp, upd, pgm, inproc and ipc. If the address is unicode, it is + encoded to utf-8 first. + """ + cdef int rc + cdef char* c_addr + + _check_closed(self) + if isinstance(addr, unicode): + addr = addr.encode('utf-8') + if not isinstance(addr, bytes): + raise TypeError('expected str, got: %r' % addr) + c_addr = addr + + rc = zmq_connect(self.handle, c_addr) + if rc != 0: + raise ZMQError() + + def unbind(self, addr): + """s.unbind(addr) + + Unbind from an address (undoes a call to bind). + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + Parameters + ---------- + addr : str + The address string. This has the form 'protocol://interface:port', + for example 'tcp://127.0.0.1:5555'. Protocols supported are + tcp, upd, pgm, inproc and ipc. If the address is unicode, it is + encoded to utf-8 first. + """ + cdef int rc + cdef char* c_addr + + _check_version((3,2), "unbind") + _check_closed(self) + if isinstance(addr, unicode): + addr = addr.encode('utf-8') + if not isinstance(addr, bytes): + raise TypeError('expected str, got: %r' % addr) + c_addr = addr + + rc = zmq_unbind(self.handle, c_addr) + if rc != 0: + raise ZMQError() + + def disconnect(self, addr): + """s.disconnect(addr) + + Disconnect from a remote 0MQ socket (undoes a call to connect). + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + Parameters + ---------- + addr : str + The address string. This has the form 'protocol://interface:port', + for example 'tcp://127.0.0.1:5555'. Protocols supported are + tcp, upd, pgm, inproc and ipc. If the address is unicode, it is + encoded to utf-8 first. + """ + cdef int rc + cdef char* c_addr + + _check_version((3,2), "disconnect") + _check_closed(self) + if isinstance(addr, unicode): + addr = addr.encode('utf-8') + if not isinstance(addr, bytes): + raise TypeError('expected str, got: %r' % addr) + c_addr = addr + + rc = zmq_disconnect(self.handle, c_addr) + if rc != 0: + raise ZMQError() + + def monitor(self, addr, int events=ZMQ_EVENT_ALL): + """s.monitor(addr, flags) + + Start publishing socket events on inproc. + See libzmq docs for zmq_monitor for details. + + While this function is available from libzmq 3.2, + pyzmq cannot parse monitor messages from libzmq prior to 4.0. + + .. versionadded: libzmq-3.2 + .. versionadded: 14.0 + + Parameters + ---------- + addr : str + The inproc url used for monitoring. Passing None as + the addr will cause an existing socket monitor to be + deregistered. + events : int [default: zmq.EVENT_ALL] + The zmq event bitmask for which events will be sent to the monitor. + """ + cdef int rc, c_flags + cdef char* c_addr = NULL + + _check_version((3,2), "monitor") + if addr is not None: + if isinstance(addr, unicode): + addr = addr.encode('utf-8') + if not isinstance(addr, bytes): + raise TypeError('expected str, got: %r' % addr) + c_addr = addr + c_flags = events + rc = zmq_socket_monitor(self.handle, c_addr, c_flags) + _check_rc(rc) + + #------------------------------------------------------------------------- + # Sending and receiving messages + #------------------------------------------------------------------------- + + cpdef object send(self, object data, int flags=0, copy=True, track=False): + """s.send(data, flags=0, copy=True, track=False) + + Send a message on this socket. + + This queues the message to be sent by the IO thread at a later time. + + Parameters + ---------- + data : object, str, Frame + The content of the message. + flags : int + Any supported flag: NOBLOCK, SNDMORE. + copy : bool + Should the message be sent in a copying or non-copying manner. + track : bool + Should the message be tracked for notification that ZMQ has + finished with it? (ignored if copy=True) + + Returns + ------- + None : if `copy` or not track + None if message was sent, raises an exception otherwise. + MessageTracker : if track and not copy + a MessageTracker object, whose `pending` property will + be True until the send is completed. + + Raises + ------ + TypeError + If a unicode object is passed + ValueError + If `track=True`, but an untracked Frame is passed. + ZMQError + If the send does not succeed for any reason. + + """ + _check_closed(self) + + if isinstance(data, unicode): + raise TypeError("unicode not allowed, use send_string") + + if copy: + # msg.bytes never returns the input data object + # it is always a copy, but always the same copy + if isinstance(data, Frame): + data = data.buffer + return _send_copy(self.handle, data, flags) + else: + if isinstance(data, Frame): + if track and not data.tracker: + raise ValueError('Not a tracked message') + msg = data + else: + msg = Frame(data, track=track) + return _send_frame(self.handle, msg, flags) + + cpdef object recv(self, int flags=0, copy=True, track=False): + """s.recv(flags=0, copy=True, track=False) + + Receive a message. + + Parameters + ---------- + flags : int + Any supported flag: NOBLOCK. If NOBLOCK is set, this method + will raise a ZMQError with EAGAIN if a message is not ready. + If NOBLOCK is not set, then this method will block until a + message arrives. + copy : bool + Should the message be received in a copying or non-copying manner? + If False a Frame object is returned, if True a string copy of + message is returned. + track : bool + Should the message be tracked for notification that ZMQ has + finished with it? (ignored if copy=True) + + Returns + ------- + msg : bytes, Frame + The received message frame. If `copy` is False, then it will be a Frame, + otherwise it will be bytes. + + Raises + ------ + ZMQError + for any of the reasons zmq_msg_recv might fail. + """ + _check_closed(self) + + if copy: + return _recv_copy(self.handle, flags) + else: + frame = _recv_frame(self.handle, flags, track) + frame.more = self.getsockopt(zmq.RCVMORE) + return frame + + +__all__ = ['Socket', 'IPC_PATH_MAX_LEN'] diff --git a/zmq/backend/cython/utils.pxd b/zmq/backend/cython/utils.pxd new file mode 100644 index 0000000..1d7117f --- /dev/null +++ b/zmq/backend/cython/utils.pxd @@ -0,0 +1,29 @@ +"""Wrap zmq_utils.h""" + +# +# Copyright (c) 2010 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + + +cdef class Stopwatch: + cdef void *watch # The C handle for the underlying zmq object + diff --git a/zmq/backend/cython/utils.pyx b/zmq/backend/cython/utils.pyx new file mode 100644 index 0000000..68976e3 --- /dev/null +++ b/zmq/backend/cython/utils.pyx @@ -0,0 +1,119 @@ +"""0MQ utils.""" + +# +# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . +# + +from libzmq cimport ( + zmq_stopwatch_start, zmq_stopwatch_stop, zmq_sleep, zmq_curve_keypair, + zmq_has, const_char_ptr +) +from zmq.error import ZMQError, _check_rc, _check_version +from zmq.utils.strtypes import unicode + +def has(capability): + """Check for zmq capability by name (e.g. 'ipc', 'curve') + + .. versionadded:: libzmq-4.1 + .. versionadded:: 14.1 + """ + _check_version((4,1), 'zmq.has') + cdef bytes ccap + if isinstance(capability, unicode): + capability = capability.encode('utf8') + ccap = capability + return bool(zmq_has(ccap)) + +def curve_keypair(): + """generate a Z85 keypair for use with zmq.CURVE security + + Requires libzmq (≥ 4.0) to have been linked with libsodium. + + .. versionadded:: libzmq-4.0 + .. versionadded:: 14.0 + + Returns + ------- + (public, secret) : two bytestrings + The public and private keypair as 40 byte z85-encoded bytestrings. + """ + cdef int rc + cdef char[64] public_key + cdef char[64] secret_key + _check_version((4,0), "curve_keypair") + rc = zmq_curve_keypair (public_key, secret_key) + _check_rc(rc) + return public_key, secret_key + + +cdef class Stopwatch: + """Stopwatch() + + A simple stopwatch based on zmq_stopwatch_start/stop. + + This class should be used for benchmarking and timing 0MQ code. + """ + + def __cinit__(self): + self.watch = NULL + + def __dealloc__(self): + # copy of self.stop() we can't call object methods in dealloc as it + # might already be partially deleted + if self.watch: + zmq_stopwatch_stop(self.watch) + self.watch = NULL + + def start(self): + """s.start() + + Start the stopwatch. + """ + if self.watch == NULL: + self.watch = zmq_stopwatch_start() + else: + raise ZMQError('Stopwatch is already running.') + + def stop(self): + """s.stop() + + Stop the stopwatch. + + Returns + ------- + t : unsigned long int + the number of microseconds since ``start()`` was called. + """ + cdef unsigned long time + if self.watch == NULL: + raise ZMQError('Must start the Stopwatch before calling stop.') + else: + time = zmq_stopwatch_stop(self.watch) + self.watch = NULL + return time + + def sleep(self, int seconds): + """s.sleep(seconds) + + Sleep for an integer number of seconds. + """ + with nogil: + zmq_sleep(seconds) + + +__all__ = ['has', 'curve_keypair', 'Stopwatch'] diff --git a/zmq/backend/select.py b/zmq/backend/select.py new file mode 100644 index 0000000..0a2e09a --- /dev/null +++ b/zmq/backend/select.py @@ -0,0 +1,39 @@ +"""Import basic exposure of libzmq C API as a backend""" + +# Copyright (C) PyZMQ Developers +# Distributed under the terms of the Modified BSD License. + +public_api = [ + 'Context', + 'Socket', + 'Frame', + 'Message', + 'Stopwatch', + 'device', + 'proxy', + 'zmq_poll', + 'strerror', + 'zmq_errno', + 'has', + 'curve_keypair', + 'constants', + 'zmq_version_info', + 'IPC_PATH_MAX_LEN', +] + +def select_backend(name): + """Select the pyzmq backend""" + try: + mod = __import__(name, fromlist=public_api) + except ImportError: + raise + except Exception as e: + import sys + from zmq.utils.sixcerpt import reraise + exc_info = sys.exc_info() + reraise(ImportError, ImportError("Importing %s failed with %s" % (name, e)), exc_info[2]) + + ns = {} + for key in public_api: + ns[key] = getattr(mod, key) + return ns -- cgit v1.2.3