summaryrefslogtreecommitdiff
path: root/zmq/sugar
diff options
context:
space:
mode:
Diffstat (limited to 'zmq/sugar')
-rw-r--r--zmq/sugar/__init__.py27
-rw-r--r--zmq/sugar/attrsettr.py52
-rw-r--r--zmq/sugar/constants.py91
-rw-r--r--zmq/sugar/context.py192
-rw-r--r--zmq/sugar/frame.py17
-rw-r--r--zmq/sugar/poll.py161
-rw-r--r--zmq/sugar/socket.py470
-rw-r--r--zmq/sugar/tracker.py120
-rw-r--r--zmq/sugar/version.py48
9 files changed, 0 insertions, 1178 deletions
diff --git a/zmq/sugar/__init__.py b/zmq/sugar/__init__.py
deleted file mode 100644
index d0510a4..0000000
--- a/zmq/sugar/__init__.py
+++ /dev/null
@@ -1,27 +0,0 @@
-"""pure-Python sugar wrappers for core 0MQ objects."""
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-from zmq.sugar import (
- constants, context, frame, poll, socket, tracker, version
-)
-from zmq import error
-
-__all__ = ['constants']
-for submod in (
- constants, context, error, frame, poll, socket, tracker, version
-):
- __all__.extend(submod.__all__)
-
-from zmq.error import *
-from zmq.sugar.context import *
-from zmq.sugar.tracker import *
-from zmq.sugar.socket import *
-from zmq.sugar.constants import *
-from zmq.sugar.frame import *
-from zmq.sugar.poll import *
-# from zmq.sugar.stopwatch import *
-# from zmq.sugar._device import *
-from zmq.sugar.version import *
diff --git a/zmq/sugar/attrsettr.py b/zmq/sugar/attrsettr.py
deleted file mode 100644
index 4bbd36d..0000000
--- a/zmq/sugar/attrsettr.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# coding: utf-8
-"""Mixin for mapping set/getattr to self.set/get"""
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-from . import constants
-
-class AttributeSetter(object):
-
- def __setattr__(self, key, value):
- """set zmq options by attribute"""
-
- # regular setattr only allowed for class-defined attributes
- for obj in [self] + self.__class__.mro():
- if key in obj.__dict__:
- object.__setattr__(self, key, value)
- return
-
- upper_key = key.upper()
- try:
- opt = getattr(constants, upper_key)
- except AttributeError:
- raise AttributeError("%s has no such option: %s" % (
- self.__class__.__name__, upper_key)
- )
- else:
- self._set_attr_opt(upper_key, opt, value)
-
- def _set_attr_opt(self, name, opt, value):
- """override if setattr should do something other than call self.set"""
- self.set(opt, value)
-
- def __getattr__(self, key):
- """get zmq options by attribute"""
- upper_key = key.upper()
- try:
- opt = getattr(constants, upper_key)
- except AttributeError:
- raise AttributeError("%s has no such option: %s" % (
- self.__class__.__name__, upper_key)
- )
- else:
- return self._get_attr_opt(upper_key, opt)
-
- def _get_attr_opt(self, name, opt):
- """override if getattr should do something other than call self.get"""
- return self.get(opt)
-
-
-__all__ = ['AttributeSetter']
diff --git a/zmq/sugar/constants.py b/zmq/sugar/constants.py
deleted file mode 100644
index 58b2263..0000000
--- a/zmq/sugar/constants.py
+++ /dev/null
@@ -1,91 +0,0 @@
-"""0MQ Constants."""
-
-# Copyright (c) IPython Development Team.
-# Distributed under the terms of the Modified BSD License.
-
-from zmq.backend import constants
-from zmq.utils.constant_names import (
- base_names,
- switched_sockopt_names,
- int_sockopt_names,
- int64_sockopt_names,
- bytes_sockopt_names,
- ctx_opt_names,
- msg_opt_names,
-)
-
-#-----------------------------------------------------------------------------
-# Python module level constants
-#-----------------------------------------------------------------------------
-
-__all__ = [
- 'int_sockopts',
- 'int64_sockopts',
- 'bytes_sockopts',
- 'ctx_opts',
- 'ctx_opt_names',
- ]
-
-int_sockopts = set()
-int64_sockopts = set()
-bytes_sockopts = set()
-ctx_opts = set()
-msg_opts = set()
-
-
-if constants.VERSION < 30000:
- int64_sockopt_names.extend(switched_sockopt_names)
-else:
- int_sockopt_names.extend(switched_sockopt_names)
-
-def _add_constant(name, container=None):
- """add a constant to be defined
-
- optionally add it to one of the sets for use in get/setopt checkers
- """
- c = getattr(constants, name, -1)
- if c == -1:
- return
- globals()[name] = c
- __all__.append(name)
- if container is not None:
- container.add(c)
- return c
-
-for name in base_names:
- _add_constant(name)
-
-for name in int_sockopt_names:
- _add_constant(name, int_sockopts)
-
-for name in int64_sockopt_names:
- _add_constant(name, int64_sockopts)
-
-for name in bytes_sockopt_names:
- _add_constant(name, bytes_sockopts)
-
-for name in ctx_opt_names:
- _add_constant(name, ctx_opts)
-
-for name in msg_opt_names:
- _add_constant(name, msg_opts)
-
-# ensure some aliases are always defined
-aliases = [
- ('DONTWAIT', 'NOBLOCK'),
- ('XREQ', 'DEALER'),
- ('XREP', 'ROUTER'),
-]
-for group in aliases:
- undefined = set()
- found = None
- for name in group:
- value = getattr(constants, name, -1)
- if value != -1:
- found = value
- else:
- undefined.add(name)
- if found is not None:
- for name in undefined:
- globals()[name] = found
- __all__.append(name)
diff --git a/zmq/sugar/context.py b/zmq/sugar/context.py
deleted file mode 100644
index 86a9c5d..0000000
--- a/zmq/sugar/context.py
+++ /dev/null
@@ -1,192 +0,0 @@
-# coding: utf-8
-"""Python bindings for 0MQ."""
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import atexit
-import weakref
-
-from zmq.backend import Context as ContextBase
-from . import constants
-from .attrsettr import AttributeSetter
-from .constants import ENOTSUP, ctx_opt_names
-from .socket import Socket
-from zmq.error import ZMQError
-
-from zmq.utils.interop import cast_int_addr
-
-
-class Context(ContextBase, AttributeSetter):
- """Create a zmq Context
-
- A zmq Context creates sockets via its ``ctx.socket`` method.
- """
- sockopts = None
- _instance = None
- _shadow = False
- _exiting = False
-
- def __init__(self, io_threads=1, **kwargs):
- super(Context, self).__init__(io_threads=io_threads, **kwargs)
- if kwargs.get('shadow', False):
- self._shadow = True
- else:
- self._shadow = False
- self.sockopts = {}
-
- self._exiting = False
- if not self._shadow:
- ctx_ref = weakref.ref(self)
- def _notify_atexit():
- ctx = ctx_ref()
- if ctx is not None:
- ctx._exiting = True
- atexit.register(_notify_atexit)
-
- def __del__(self):
- """deleting a Context should terminate it, without trying non-threadsafe destroy"""
- if not self._shadow and not self._exiting:
- self.term()
-
- def __enter__(self):
- return self
-
- def __exit__(self, *args, **kwargs):
- self.term()
-
- @classmethod
- def shadow(cls, address):
- """Shadow an existing libzmq context
-
- address is the integer address of the libzmq context
- or an FFI pointer to it.
-
- .. versionadded:: 14.1
- """
- address = cast_int_addr(address)
- return cls(shadow=address)
-
- @classmethod
- def shadow_pyczmq(cls, ctx):
- """Shadow an existing pyczmq context
-
- ctx is the FFI `zctx_t *` pointer
-
- .. versionadded:: 14.1
- """
- from pyczmq import zctx
-
- underlying = zctx.underlying(ctx)
- address = cast_int_addr(underlying)
- return cls(shadow=address)
-
- # static method copied from tornado IOLoop.instance
- @classmethod
- def instance(cls, io_threads=1):
- """Returns a global Context instance.
-
- Most single-threaded applications have a single, global Context.
- Use this method instead of passing around Context instances
- throughout your code.
-
- A common pattern for classes that depend on Contexts is to use
- a default argument to enable programs with multiple Contexts
- but not require the argument for simpler applications:
-
- class MyClass(object):
- def __init__(self, context=None):
- self.context = context or Context.instance()
- """
- if cls._instance is None or cls._instance.closed:
- cls._instance = cls(io_threads=io_threads)
- return cls._instance
-
- #-------------------------------------------------------------------------
- # Hooks for ctxopt completion
- #-------------------------------------------------------------------------
-
- def __dir__(self):
- keys = dir(self.__class__)
-
- for collection in (
- ctx_opt_names,
- ):
- keys.extend(collection)
- return keys
-
- #-------------------------------------------------------------------------
- # Creating Sockets
- #-------------------------------------------------------------------------
-
- @property
- def _socket_class(self):
- return Socket
-
- def socket(self, socket_type):
- """Create a Socket associated with this Context.
-
- Parameters
- ----------
- socket_type : int
- The socket type, which can be any of the 0MQ socket types:
- REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, etc.
- """
- if self.closed:
- raise ZMQError(ENOTSUP)
- s = self._socket_class(self, socket_type)
- for opt, value in self.sockopts.items():
- try:
- s.setsockopt(opt, value)
- except ZMQError:
- # ignore ZMQErrors, which are likely for socket options
- # that do not apply to a particular socket type, e.g.
- # SUBSCRIBE for non-SUB sockets.
- pass
- return s
-
- def setsockopt(self, opt, value):
- """set default socket options for new sockets created by this Context
-
- .. versionadded:: 13.0
- """
- self.sockopts[opt] = value
-
- def getsockopt(self, opt):
- """get default socket options for new sockets created by this Context
-
- .. versionadded:: 13.0
- """
- return self.sockopts[opt]
-
- def _set_attr_opt(self, name, opt, value):
- """set default sockopts as attributes"""
- if name in constants.ctx_opt_names:
- return self.set(opt, value)
- else:
- self.sockopts[opt] = value
-
- def _get_attr_opt(self, name, opt):
- """get default sockopts as attributes"""
- if name in constants.ctx_opt_names:
- return self.get(opt)
- else:
- if opt not in self.sockopts:
- raise AttributeError(name)
- else:
- return self.sockopts[opt]
-
- def __delattr__(self, key):
- """delete default sockopts as attributes"""
- key = key.upper()
- try:
- opt = getattr(constants, key)
- except AttributeError:
- raise AttributeError("no such socket option: %s" % key)
- else:
- if opt not in self.sockopts:
- raise AttributeError(key)
- else:
- del self.sockopts[opt]
-
-__all__ = ['Context']
diff --git a/zmq/sugar/frame.py b/zmq/sugar/frame.py
deleted file mode 100644
index cae4491..0000000
--- a/zmq/sugar/frame.py
+++ /dev/null
@@ -1,17 +0,0 @@
-# coding: utf-8
-"""0MQ Frame pure Python methods."""
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-from .attrsettr import AttributeSetter
-from zmq.backend import Frame as FrameBase
-
-
-class Frame(FrameBase, AttributeSetter):
- pass
-
-# keep deprecated alias
-Message = Frame
-__all__ = ['Frame', 'Message'] \ No newline at end of file
diff --git a/zmq/sugar/poll.py b/zmq/sugar/poll.py
deleted file mode 100644
index c7b1d1b..0000000
--- a/zmq/sugar/poll.py
+++ /dev/null
@@ -1,161 +0,0 @@
-"""0MQ polling related functions and classes."""
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-import zmq
-from zmq.backend import zmq_poll
-from .constants import POLLIN, POLLOUT, POLLERR
-
-#-----------------------------------------------------------------------------
-# Polling related methods
-#-----------------------------------------------------------------------------
-
-
-class Poller(object):
- """A stateful poll interface that mirrors Python's built-in poll."""
- sockets = None
- _map = {}
-
- def __init__(self):
- self.sockets = []
- self._map = {}
-
- def __contains__(self, socket):
- return socket in self._map
-
- def register(self, socket, flags=POLLIN|POLLOUT):
- """p.register(socket, flags=POLLIN|POLLOUT)
-
- Register a 0MQ socket or native fd for I/O monitoring.
-
- register(s,0) is equivalent to unregister(s).
-
- Parameters
- ----------
- socket : zmq.Socket or native socket
- A zmq.Socket or any Python object having a ``fileno()``
- method that returns a valid file descriptor.
- flags : int
- The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
- If `flags=0`, socket will be unregistered.
- """
- if flags:
- if socket in self._map:
- idx = self._map[socket]
- self.sockets[idx] = (socket, flags)
- else:
- idx = len(self.sockets)
- self.sockets.append((socket, flags))
- self._map[socket] = idx
- elif socket in self._map:
- # uregister sockets registered with no events
- self.unregister(socket)
- else:
- # ignore new sockets with no events
- pass
-
- def modify(self, socket, flags=POLLIN|POLLOUT):
- """Modify the flags for an already registered 0MQ socket or native fd."""
- self.register(socket, flags)
-
- def unregister(self, socket):
- """Remove a 0MQ socket or native fd for I/O monitoring.
-
- Parameters
- ----------
- socket : Socket
- The socket instance to stop polling.
- """
- idx = self._map.pop(socket)
- self.sockets.pop(idx)
- # shift indices after deletion
- for socket, flags in self.sockets[idx:]:
- self._map[socket] -= 1
-
- def poll(self, timeout=None):
- """Poll the registered 0MQ or native fds for I/O.
-
- Parameters
- ----------
- timeout : float, int
- The timeout in milliseconds. If None, no `timeout` (infinite). This
- is in milliseconds to be compatible with ``select.poll()``. The
- underlying zmq_poll uses microseconds and we convert to that in
- this function.
-
- Returns
- -------
- events : list of tuples
- The list of events that are ready to be processed.
- This is a list of tuples of the form ``(socket, event)``, where the 0MQ Socket
- or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second.
- It is common to call ``events = dict(poller.poll())``,
- which turns the list of tuples into a mapping of ``socket : event``.
- """
- if timeout is None or timeout < 0:
- timeout = -1
- elif isinstance(timeout, float):
- timeout = int(timeout)
- return zmq_poll(self.sockets, timeout=timeout)
-
-
-def select(rlist, wlist, xlist, timeout=None):
- """select(rlist, wlist, xlist, timeout=None) -> (rlist, wlist, xlist)
-
- Return the result of poll as a lists of sockets ready for r/w/exception.
-
- This has the same interface as Python's built-in ``select.select()`` function.
-
- Parameters
- ----------
- timeout : float, int, optional
- The timeout in seconds. If None, no timeout (infinite). This is in seconds to be
- compatible with ``select.select()``. The underlying zmq_poll uses microseconds
- and we convert to that in this function.
- rlist : list of sockets/FDs
- sockets/FDs to be polled for read events
- wlist : list of sockets/FDs
- sockets/FDs to be polled for write events
- xlist : list of sockets/FDs
- sockets/FDs to be polled for error events
-
- Returns
- -------
- (rlist, wlist, xlist) : tuple of lists of sockets (length 3)
- Lists correspond to sockets available for read/write/error events respectively.
- """
- if timeout is None:
- timeout = -1
- # Convert from sec -> us for zmq_poll.
- # zmq_poll accepts 3.x style timeout in ms
- timeout = int(timeout*1000.0)
- if timeout < 0:
- timeout = -1
- sockets = []
- for s in set(rlist + wlist + xlist):
- flags = 0
- if s in rlist:
- flags |= POLLIN
- if s in wlist:
- flags |= POLLOUT
- if s in xlist:
- flags |= POLLERR
- sockets.append((s, flags))
- return_sockets = zmq_poll(sockets, timeout)
- rlist, wlist, xlist = [], [], []
- for s, flags in return_sockets:
- if flags & POLLIN:
- rlist.append(s)
- if flags & POLLOUT:
- wlist.append(s)
- if flags & POLLERR:
- xlist.append(s)
- return rlist, wlist, xlist
-
-#-----------------------------------------------------------------------------
-# Symbols to export
-#-----------------------------------------------------------------------------
-
-__all__ = [ 'Poller', 'select' ]
diff --git a/zmq/sugar/socket.py b/zmq/sugar/socket.py
deleted file mode 100644
index c4d4df8..0000000
--- a/zmq/sugar/socket.py
+++ /dev/null
@@ -1,470 +0,0 @@
-# coding: utf-8
-"""0MQ Socket pure Python methods."""
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-import codecs
-import random
-import warnings
-
-import zmq
-from zmq.backend import Socket as SocketBase
-from .poll import Poller
-from . import constants
-from .attrsettr import AttributeSetter
-from zmq.error import ZMQError, ZMQBindError
-from zmq.utils import jsonapi
-from zmq.utils.strtypes import bytes,unicode,basestring
-from zmq.utils.interop import cast_int_addr
-
-from .constants import (
- SNDMORE, ENOTSUP, POLLIN,
- int64_sockopt_names,
- int_sockopt_names,
- bytes_sockopt_names,
-)
-try:
- import cPickle
- pickle = cPickle
-except:
- cPickle = None
- import pickle
-
-
-class Socket(SocketBase, AttributeSetter):
- """The ZMQ socket object
-
- To create a Socket, first create a Context::
-
- ctx = zmq.Context.instance()
-
- then call ``ctx.socket(socket_type)``::
-
- s = ctx.socket(zmq.ROUTER)
-
- """
- _shadow = False
-
- def __del__(self):
- if not self._shadow:
- self.close()
-
- #-------------------------------------------------------------------------
- # Socket creation
- #-------------------------------------------------------------------------
-
- @classmethod
- def shadow(cls, address):
- """Shadow an existing libzmq socket
-
- address is the integer address of the libzmq socket
- or an FFI pointer to it.
-
- .. versionadded:: 14.1
- """
- address = cast_int_addr(address)
- return cls(shadow=address)
-
- #-------------------------------------------------------------------------
- # Deprecated aliases
- #-------------------------------------------------------------------------
-
- @property
- def socket_type(self):
- warnings.warn("Socket.socket_type is deprecated, use Socket.type",
- DeprecationWarning
- )
- return self.type
-
- #-------------------------------------------------------------------------
- # Hooks for sockopt completion
- #-------------------------------------------------------------------------
-
- def __dir__(self):
- keys = dir(self.__class__)
- for collection in (
- bytes_sockopt_names,
- int_sockopt_names,
- int64_sockopt_names,
- ):
- keys.extend(collection)
- return keys
-
- #-------------------------------------------------------------------------
- # Getting/Setting options
- #-------------------------------------------------------------------------
- setsockopt = SocketBase.set
- getsockopt = SocketBase.get
-
- def set_string(self, option, optval, encoding='utf-8'):
- """set socket options with a unicode object
-
- This is simply a wrapper for setsockopt to protect from encoding ambiguity.
-
- See the 0MQ documentation for details on specific options.
-
- Parameters
- ----------
- option : int
- The name of the option to set. Can be any of: SUBSCRIBE,
- UNSUBSCRIBE, IDENTITY
- optval : unicode string (unicode on py2, str on py3)
- The value of the option to set.
- encoding : str
- The encoding to be used, default is utf8
- """
- if not isinstance(optval, unicode):
- raise TypeError("unicode strings only")
- return self.set(option, optval.encode(encoding))
-
- setsockopt_unicode = setsockopt_string = set_string
-
- def get_string(self, option, encoding='utf-8'):
- """get the value of a socket option
-
- See the 0MQ documentation for details on specific options.
-
- Parameters
- ----------
- option : int
- The option to retrieve.
-
- Returns
- -------
- optval : unicode string (unicode on py2, str on py3)
- The value of the option as a unicode string.
- """
-
- if option not in constants.bytes_sockopts:
- raise TypeError("option %i will not return a string to be decoded"%option)
- return self.getsockopt(option).decode(encoding)
-
- getsockopt_unicode = getsockopt_string = get_string
-
- def bind_to_random_port(self, addr, min_port=49152, max_port=65536, max_tries=100):
- """bind this socket to a random port in a range
-
- Parameters
- ----------
- addr : str
- The address string without the port to pass to ``Socket.bind()``.
- min_port : int, optional
- The minimum port in the range of ports to try (inclusive).
- max_port : int, optional
- The maximum port in the range of ports to try (exclusive).
- max_tries : int, optional
- The maximum number of bind attempts to make.
-
- Returns
- -------
- port : int
- The port the socket was bound to.
-
- Raises
- ------
- ZMQBindError
- if `max_tries` reached before successful bind
- """
- for i in range(max_tries):
- try:
- port = random.randrange(min_port, max_port)
- self.bind('%s:%s' % (addr, port))
- except ZMQError as exception:
- if not exception.errno == zmq.EADDRINUSE:
- raise
- else:
- return port
- raise ZMQBindError("Could not bind socket to random port.")
-
- def get_hwm(self):
- """get the High Water Mark
-
- On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM
- """
- major = zmq.zmq_version_info()[0]
- if major >= 3:
- # return sndhwm, fallback on rcvhwm
- try:
- return self.getsockopt(zmq.SNDHWM)
- except zmq.ZMQError as e:
- pass
-
- return self.getsockopt(zmq.RCVHWM)
- else:
- return self.getsockopt(zmq.HWM)
-
- def set_hwm(self, value):
- """set the High Water Mark
-
- On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
- """
- major = zmq.zmq_version_info()[0]
- if major >= 3:
- raised = None
- try:
- self.sndhwm = value
- except Exception as e:
- raised = e
- try:
- self.rcvhwm = value
- except Exception:
- raised = e
-
- if raised:
- raise raised
- else:
- return self.setsockopt(zmq.HWM, value)
-
- hwm = property(get_hwm, set_hwm,
- """property for High Water Mark
-
- Setting hwm sets both SNDHWM and RCVHWM as appropriate.
- It gets SNDHWM if available, otherwise RCVHWM.
- """
- )
-
- #-------------------------------------------------------------------------
- # Sending and receiving messages
- #-------------------------------------------------------------------------
-
- def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
- """send a sequence of buffers as a multipart message
-
- The zmq.SNDMORE flag is added to all msg parts before the last.
-
- Parameters
- ----------
- msg_parts : iterable
- A sequence of objects to send as a multipart message. Each element
- can be any sendable object (Frame, bytes, buffer-providers)
- flags : int, optional
- SNDMORE is handled automatically for frames before the last.
- copy : bool, optional
- Should the frame(s) be sent in a copying or non-copying manner.
- track : bool, optional
- Should the frame(s) be tracked for notification that ZMQ has
- finished with it (ignored if copy=True).
-
- Returns
- -------
- None : if copy or not track
- MessageTracker : if track and not copy
- a MessageTracker object, whose `pending` property will
- be True until the last send is completed.
- """
- for msg in msg_parts[:-1]:
- self.send(msg, SNDMORE|flags, copy=copy, track=track)
- # Send the last part without the extra SNDMORE flag.
- return self.send(msg_parts[-1], flags, copy=copy, track=track)
-
- def recv_multipart(self, flags=0, copy=True, track=False):
- """receive a multipart message as a list of bytes or Frame objects
-
- Parameters
- ----------
- flags : int, optional
- Any supported flag: NOBLOCK. If NOBLOCK is set, this method
- will raise a ZMQError with EAGAIN if a message is not ready.
- If NOBLOCK is not set, then this method will block until a
- message arrives.
- copy : bool, optional
- Should the message frame(s) be received in a copying or non-copying manner?
- If False a Frame object is returned for each part, if True a copy of
- the bytes is made for each frame.
- track : bool, optional
- Should the message frame(s) be tracked for notification that ZMQ has
- finished with it? (ignored if copy=True)
-
- Returns
- -------
- msg_parts : list
- A list of frames in the multipart message; either Frames or bytes,
- depending on `copy`.
-
- """
- parts = [self.recv(flags, copy=copy, track=track)]
- # have first part already, only loop while more to receive
- while self.getsockopt(zmq.RCVMORE):
- part = self.recv(flags, copy=copy, track=track)
- parts.append(part)
-
- return parts
-
- def send_string(self, u, flags=0, copy=True, encoding='utf-8'):
- """send a Python unicode string as a message with an encoding
-
- 0MQ communicates with raw bytes, so you must encode/decode
- text (unicode on py2, str on py3) around 0MQ.
-
- Parameters
- ----------
- u : Python unicode string (unicode on py2, str on py3)
- The unicode string to send.
- flags : int, optional
- Any valid send flag.
- encoding : str [default: 'utf-8']
- The encoding to be used
- """
- if not isinstance(u, basestring):
- raise TypeError("unicode/str objects only")
- return self.send(u.encode(encoding), flags=flags, copy=copy)
-
- send_unicode = send_string
-
- def recv_string(self, flags=0, encoding='utf-8'):
- """receive a unicode string, as sent by send_string
-
- Parameters
- ----------
- flags : int
- Any valid recv flag.
- encoding : str [default: 'utf-8']
- The encoding to be used
-
- Returns
- -------
- s : unicode string (unicode on py2, str on py3)
- The Python unicode string that arrives as encoded bytes.
- """
- b = self.recv(flags=flags)
- return b.decode(encoding)
-
- recv_unicode = recv_string
-
- def send_pyobj(self, obj, flags=0, protocol=-1):
- """send a Python object as a message using pickle to serialize
-
- Parameters
- ----------
- obj : Python object
- The Python object to send.
- flags : int
- Any valid send flag.
- protocol : int
- The pickle protocol number to use. Default of -1 will select
- the highest supported number. Use 0 for multiple platform
- support.
- """
- msg = pickle.dumps(obj, protocol)
- return self.send(msg, flags)
-
- def recv_pyobj(self, flags=0):
- """receive a Python object as a message using pickle to serialize
-
- Parameters
- ----------
- flags : int
- Any valid recv flag.
-
- Returns
- -------
- obj : Python object
- The Python object that arrives as a message.
- """
- s = self.recv(flags)
- return pickle.loads(s)
-
- def send_json(self, obj, flags=0, **kwargs):
- """send a Python object as a message using json to serialize
-
- Keyword arguments are passed on to json.dumps
-
- Parameters
- ----------
- obj : Python object
- The Python object to send
- flags : int
- Any valid send flag
- """
- msg = jsonapi.dumps(obj, **kwargs)
- return self.send(msg, flags)
-
- def recv_json(self, flags=0, **kwargs):
- """receive a Python object as a message using json to serialize
-
- Keyword arguments are passed on to json.loads
-
- Parameters
- ----------
- flags : int
- Any valid recv flag.
-
- Returns
- -------
- obj : Python object
- The Python object that arrives as a message.
- """
- msg = self.recv(flags)
- return jsonapi.loads(msg, **kwargs)
-
- _poller_class = Poller
-
- def poll(self, timeout=None, flags=POLLIN):
- """poll the socket for events
-
- The default is to poll forever for incoming
- events. Timeout is in milliseconds, if specified.
-
- Parameters
- ----------
- timeout : int [default: None]
- The timeout (in milliseconds) to wait for an event. If unspecified
- (or specified None), will wait forever for an event.
- flags : bitfield (int) [default: POLLIN]
- The event flags to poll for (any combination of POLLIN|POLLOUT).
- The default is to check for incoming events (POLLIN).
-
- Returns
- -------
- events : bitfield (int)
- The events that are ready and waiting. Will be 0 if no events were ready
- by the time timeout was reached.
- """
-
- if self.closed:
- raise ZMQError(ENOTSUP)
-
- p = self._poller_class()
- p.register(self, flags)
- evts = dict(p.poll(timeout))
- # return 0 if no events, otherwise return event bitfield
- return evts.get(self, 0)
-
- def get_monitor_socket(self, events=None, addr=None):
- """Return a connected PAIR socket ready to receive the event notifications.
-
- .. versionadded:: libzmq-4.0
- .. versionadded:: 14.0
-
- Parameters
- ----------
- events : bitfield (int) [default: ZMQ_EVENTS_ALL]
- The bitmask defining which events are wanted.
- addr : string [default: None]
- The optional endpoint for the monitoring sockets.
-
- Returns
- -------
- socket : (PAIR)
- The socket is already connected and ready to receive messages.
- """
- # safe-guard, method only available on libzmq >= 4
- if zmq.zmq_version_info() < (4,):
- raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version())
- if addr is None:
- # create endpoint name from internal fd
- addr = "inproc://monitor.s-%d" % self.FD
- if events is None:
- # use all events
- events = zmq.EVENT_ALL
- # attach monitoring socket
- self.monitor(addr, events)
- # create new PAIR socket and connect it
- ret = self.context.socket(zmq.PAIR)
- ret.connect(addr)
- return ret
-
-
-__all__ = ['Socket']
diff --git a/zmq/sugar/tracker.py b/zmq/sugar/tracker.py
deleted file mode 100644
index fb8c007..0000000
--- a/zmq/sugar/tracker.py
+++ /dev/null
@@ -1,120 +0,0 @@
-"""Tracker for zero-copy messages with 0MQ."""
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-import time
-
-try:
- # below 3.3
- from threading import _Event as Event
-except (ImportError, AttributeError):
- # python throws ImportError, cython throws AttributeError
- from threading import Event
-
-from zmq.error import NotDone
-from zmq.backend import Frame
-
-class MessageTracker(object):
- """MessageTracker(*towatch)
-
- A class for tracking if 0MQ is done using one or more messages.
-
- When you send a 0MQ message, it is not sent immediately. The 0MQ IO thread
- sends the message at some later time. Often you want to know when 0MQ has
- actually sent the message though. This is complicated by the fact that
- a single 0MQ message can be sent multiple times using different sockets.
- This class allows you to track all of the 0MQ usages of a message.
-
- Parameters
- ----------
- *towatch : tuple of Event, MessageTracker, Message instances.
- This list of objects to track. This class can track the low-level
- Events used by the Message class, other MessageTrackers or
- actual Messages.
- """
- events = None
- peers = None
-
- def __init__(self, *towatch):
- """MessageTracker(*towatch)
-
- Create a message tracker to track a set of mesages.
-
- Parameters
- ----------
- *towatch : tuple of Event, MessageTracker, Message instances.
- This list of objects to track. This class can track the low-level
- Events used by the Message class, other MessageTrackers or
- actual Messages.
- """
- self.events = set()
- self.peers = set()
- for obj in towatch:
- if isinstance(obj, Event):
- self.events.add(obj)
- elif isinstance(obj, MessageTracker):
- self.peers.add(obj)
- elif isinstance(obj, Frame):
- if not obj.tracker:
- raise ValueError("Not a tracked message")
- self.peers.add(obj.tracker)
- else:
- raise TypeError("Require Events or Message Frames, not %s"%type(obj))
-
- @property
- def done(self):
- """Is 0MQ completely done with the message(s) being tracked?"""
- for evt in self.events:
- if not evt.is_set():
- return False
- for pm in self.peers:
- if not pm.done:
- return False
- return True
-
- def wait(self, timeout=-1):
- """mt.wait(timeout=-1)
-
- Wait for 0MQ to be done with the message or until `timeout`.
-
- Parameters
- ----------
- timeout : float [default: -1, wait forever]
- Maximum time in (s) to wait before raising NotDone.
-
- Returns
- -------
- None
- if done before `timeout`
-
- Raises
- ------
- NotDone
- if `timeout` reached before I am done.
- """
- tic = time.time()
- if timeout is False or timeout < 0:
- remaining = 3600*24*7 # a week
- else:
- remaining = timeout
- done = False
- for evt in self.events:
- if remaining < 0:
- raise NotDone
- evt.wait(timeout=remaining)
- if not evt.is_set():
- raise NotDone
- toc = time.time()
- remaining -= (toc-tic)
- tic = toc
-
- for peer in self.peers:
- if remaining < 0:
- raise NotDone
- peer.wait(timeout=remaining)
- toc = time.time()
- remaining -= (toc-tic)
- tic = toc
-
-__all__ = ['MessageTracker'] \ No newline at end of file
diff --git a/zmq/sugar/version.py b/zmq/sugar/version.py
deleted file mode 100644
index 3e2de6c..0000000
--- a/zmq/sugar/version.py
+++ /dev/null
@@ -1,48 +0,0 @@
-"""PyZMQ and 0MQ version functions."""
-
-# Copyright (C) PyZMQ Developers
-# Distributed under the terms of the Modified BSD License.
-
-
-from zmq.backend import zmq_version_info
-
-
-VERSION_MAJOR = 14
-VERSION_MINOR = 3
-VERSION_PATCH = 1
-VERSION_EXTRA = ""
-__version__ = '%i.%i.%i' % (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)
-
-if VERSION_EXTRA:
- __version__ = "%s-%s" % (__version__, VERSION_EXTRA)
- version_info = (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH, float('inf'))
-else:
- version_info = (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)
-
-__revision__ = ''
-
-def pyzmq_version():
- """return the version of pyzmq as a string"""
- if __revision__:
- return '@'.join([__version__,__revision__[:6]])
- else:
- return __version__
-
-def pyzmq_version_info():
- """return the pyzmq version as a tuple of at least three numbers
-
- If pyzmq is a development version, `inf` will be appended after the third integer.
- """
- return version_info
-
-
-def zmq_version():
- """return the version of libzmq as a string"""
- return "%i.%i.%i" % zmq_version_info()
-
-
-__all__ = ['zmq_version', 'zmq_version_info',
- 'pyzmq_version','pyzmq_version_info',
- '__version__', '__revision__'
-]
-