diff options
Diffstat (limited to 'zmq/backend/cython/message.pyx')
-rw-r--r-- | zmq/backend/cython/message.pyx | 381 |
1 files changed, 381 insertions, 0 deletions
diff --git a/zmq/backend/cython/message.pyx b/zmq/backend/cython/message.pyx new file mode 100644 index 0000000..312ae12 --- /dev/null +++ b/zmq/backend/cython/message.pyx @@ -0,0 +1,381 @@ +"""0MQ Message related classes.""" + +# +# Copyright (c) 2013 Brian E. Granger & Min Ragan-Kelley +# +# This file is part of pyzmq. +# +# pyzmq is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# pyzmq is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +# get version-independent aliases: +cdef extern from "pyversion_compat.h": + pass + +from cpython cimport Py_DECREF, Py_INCREF + +from buffers cimport asbuffer_r, viewfromobject_r + +cdef extern from "Python.h": + ctypedef int Py_ssize_t + +from libzmq cimport * + +from libc.stdio cimport fprintf, stderr as cstderr +from libc.stdlib cimport malloc, free +from libc.string cimport memcpy + +import time + +try: + # below 3.3 + from threading import _Event as Event +except (ImportError, AttributeError): + # python throws ImportError, cython throws AttributeError + from threading import Event + +import zmq +from zmq.error import _check_version +from zmq.backend.cython.checkrc cimport _check_rc +from zmq.utils.strtypes import bytes,unicode,basestring + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +ctypedef struct zhint: + void *ctx + size_t id + +cdef void free_python_msg(void *data, void *vhint) nogil: + """A pure-C function for DECREF'ing Python-owned message data. + + Sends a message on a PUSH socket + + The hint is a `zhint` struct with two values: + + ctx (void *): pointer to the Garbage Collector's context + id (size_t): the id to be used to construct a zmq_msg_t that should be sent on a PUSH socket, + signaling the Garbage Collector to remove its reference to the object. + + - A PUSH socket is created in the context, + - it is connected to the garbage collector inproc channel, + - it sends the gc message + - the PUSH socket is closed + + When the Garbage Collector's PULL socket receives the message, + it deletes its reference to the object, + allowing Python to free the memory. + """ + cdef void *push + cdef zmq_msg_t msg + cdef zhint *hint = <zhint *> vhint + if hint != NULL: + zmq_msg_init_size(&msg, sizeof(size_t)) + memcpy(zmq_msg_data(&msg), &hint.id, sizeof(size_t)) + + push = zmq_socket(hint.ctx, ZMQ_PUSH) + if push == NULL: + # this will happen if the context has been terminated + return + rc = zmq_connect(push, "inproc://pyzmq.gc.01") + if rc < 0: + fprintf(cstderr, "pyzmq-gc connect failed: %s\n", zmq_strerror(zmq_errno())) + return + + rc = zmq_msg_send(&msg, push, 0) + if rc < 0: + fprintf(cstderr, "pyzmq-gc send failed: %s\n", zmq_strerror(zmq_errno())) + + zmq_msg_close(&msg) + zmq_close(push) + free(hint) + +gc = None + +cdef class Frame: + """Frame(data=None, track=False) + + A zmq message Frame class for non-copy send/recvs. + + This class is only needed if you want to do non-copying send and recvs. + When you pass a string to this class, like ``Frame(s)``, the + ref-count of `s` is increased by two: once because the Frame saves `s` as + an instance attribute and another because a ZMQ message is created that + points to the buffer of `s`. This second ref-count increase makes sure + that `s` lives until all messages that use it have been sent. Once 0MQ + sends all the messages and it doesn't need the buffer of s, 0MQ will call + ``Py_DECREF(s)``. + + Parameters + ---------- + + data : object, optional + any object that provides the buffer interface will be used to + construct the 0MQ message data. + track : bool [default: False] + whether a MessageTracker_ should be created to track this object. + Tracking a message has a cost at creation, because it creates a threadsafe + Event object. + + """ + + def __cinit__(self, object data=None, track=False, **kwargs): + cdef int rc + cdef char *data_c = NULL + cdef Py_ssize_t data_len_c=0 + cdef zhint *hint + + # init more as False + self.more = False + + # Save the data object in case the user wants the the data as a str. + self._data = data + self._failed_init = True # bool switch for dealloc + self._buffer = None # buffer view of data + self._bytes = None # bytes copy of data + + # Event and MessageTracker for monitoring when zmq is done with data: + if track: + evt = Event() + self.tracker_event = evt + self.tracker = zmq.MessageTracker(evt) + else: + self.tracker_event = None + self.tracker = None + + if isinstance(data, unicode): + raise TypeError("Unicode objects not allowed. Only: str/bytes, buffer interfaces.") + + if data is None: + rc = zmq_msg_init(&self.zmq_msg) + _check_rc(rc) + self._failed_init = False + return + else: + asbuffer_r(data, <void **>&data_c, &data_len_c) + + # create the hint for zmq_free_fn + # two pointers: the gc context and a message to be sent to the gc PULL socket + # allows libzmq to signal to Python when it is done with Python-owned memory. + global gc + if gc is None: + from zmq.utils.garbage import gc + + hint = <zhint *> malloc(sizeof(zhint)) + hint.id = gc.store(data, self.tracker_event) + hint.ctx = <void *> <size_t> gc._context.underlying + + rc = zmq_msg_init_data( + &self.zmq_msg, <void *>data_c, data_len_c, + <zmq_free_fn *>free_python_msg, <void *>hint + ) + if rc != 0: + free(hint) + _check_rc(rc) + self._failed_init = False + + def __init__(self, object data=None, track=False): + """Enforce signature""" + pass + + def __dealloc__(self): + cdef int rc + if self._failed_init: + return + # This simply decreases the 0MQ ref-count of zmq_msg. + with nogil: + rc = zmq_msg_close(&self.zmq_msg) + _check_rc(rc) + + # buffer interface code adapted from petsc4py by Lisandro Dalcin, a BSD project + + def __getbuffer__(self, Py_buffer* buffer, int flags): + # new-style (memoryview) buffer interface + buffer.buf = zmq_msg_data(&self.zmq_msg) + buffer.len = zmq_msg_size(&self.zmq_msg) + + buffer.obj = self + buffer.readonly = 1 + buffer.format = "B" + buffer.ndim = 0 + buffer.shape = NULL + buffer.strides = NULL + buffer.suboffsets = NULL + buffer.itemsize = 1 + buffer.internal = NULL + + def __getsegcount__(self, Py_ssize_t *lenp): + # required for getreadbuffer + if lenp != NULL: + lenp[0] = zmq_msg_size(&self.zmq_msg) + return 1 + + def __getreadbuffer__(self, Py_ssize_t idx, void **p): + # old-style (buffer) interface + cdef char *data_c = NULL + cdef Py_ssize_t data_len_c + if idx != 0: + raise SystemError("accessing non-existent buffer segment") + # read-only, because we don't want to allow + # editing of the message in-place + data_c = <char *>zmq_msg_data(&self.zmq_msg) + data_len_c = zmq_msg_size(&self.zmq_msg) + if p != NULL: + p[0] = <void*>data_c + return data_len_c + + # end buffer interface + + def __copy__(self): + """Create a shallow copy of the message. + + This does not copy the contents of the Frame, just the pointer. + This will increment the 0MQ ref count of the message, but not + the ref count of the Python object. That is only done once when + the Python is first turned into a 0MQ message. + """ + return self.fast_copy() + + cdef Frame fast_copy(self): + """Fast, cdef'd version of shallow copy of the Frame.""" + cdef Frame new_msg + new_msg = Frame() + # This does not copy the contents, but just increases the ref-count + # of the zmq_msg by one. + zmq_msg_copy(&new_msg.zmq_msg, &self.zmq_msg) + # Copy the ref to data so the copy won't create a copy when str is + # called. + if self._data is not None: + new_msg._data = self._data + if self._buffer is not None: + new_msg._buffer = self._buffer + if self._bytes is not None: + new_msg._bytes = self._bytes + + # Frame copies share the tracker and tracker_event + new_msg.tracker_event = self.tracker_event + new_msg.tracker = self.tracker + + return new_msg + + def __len__(self): + """Return the length of the message in bytes.""" + cdef size_t sz + sz = zmq_msg_size(&self.zmq_msg) + return sz + # return <int>zmq_msg_size(&self.zmq_msg) + + def __str__(self): + """Return the str form of the message.""" + if isinstance(self._data, bytes): + b = self._data + else: + b = self.bytes + if str is unicode: + return b.decode() + else: + return b + + cdef inline object _getbuffer(self): + """Create a Python buffer/view of the message data. + + This will be called only once, the first time the `buffer` property + is accessed. Subsequent calls use a cached copy. + """ + if self._data is None: + return viewfromobject_r(self) + else: + return viewfromobject_r(self._data) + + @property + def buffer(self): + """A read-only buffer view of the message contents.""" + if self._buffer is None: + self._buffer = self._getbuffer() + return self._buffer + + @property + def bytes(self): + """The message content as a Python bytes object. + + The first time this property is accessed, a copy of the message + contents is made. From then on that same copy of the message is + returned. + """ + if self._bytes is None: + self._bytes = copy_zmq_msg_bytes(&self.zmq_msg) + return self._bytes + + def set(self, int option, int value): + """Frame.set(option, value) + + Set a Frame option. + + See the 0MQ API documentation for zmq_msg_set + for details on specific options. + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + """ + cdef int rc = zmq_msg_set(&self.zmq_msg, option, value) + _check_rc(rc) + + def get(self, option): + """Frame.get(option) + + Get a Frame option or property. + + See the 0MQ API documentation for zmq_msg_get and zmq_msg_gets + for details on specific options. + + .. versionadded:: libzmq-3.2 + .. versionadded:: 13.0 + + .. versionchanged:: 14.3 + add support for zmq_msg_gets (requires libzmq-4.1) + """ + cdef int rc = 0 + cdef char *property_c = NULL + cdef Py_ssize_t property_len_c = 0 + + # zmq_msg_get + if isinstance(option, int): + rc = zmq_msg_get(&self.zmq_msg, option) + _check_rc(rc) + return rc + + # zmq_msg_gets + _check_version((4,1), "get string properties") + if isinstance(option, unicode): + option = option.encode('utf8') + + if not isinstance(option, bytes): + raise TypeError("expected str, got: %r" % option) + + property_c = option + + cdef const char *result = <char *>zmq_msg_gets(&self.zmq_msg, property_c) + if result == NULL: + _check_rc(-1) + return result.decode('utf8') + +# legacy Message name +Message = Frame + +__all__ = ['Frame', 'Message'] |