From 44be832c5708baadd146cb954befbc3dcad8d463 Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Tue, 11 Nov 2014 11:52:45 -0500 Subject: prepare for upgrade to new upstream --- zmq/devices/__init__.py | 16 --- zmq/devices/basedevice.py | 229 ------------------------------------ zmq/devices/monitoredqueue.pxd | 177 ---------------------------- zmq/devices/monitoredqueue.py | 37 ------ zmq/devices/monitoredqueue.pyx | 103 ---------------- zmq/devices/monitoredqueuedevice.py | 66 ----------- zmq/devices/proxydevice.py | 90 -------------- 7 files changed, 718 deletions(-) delete mode 100644 zmq/devices/__init__.py delete mode 100644 zmq/devices/basedevice.py delete mode 100644 zmq/devices/monitoredqueue.pxd delete mode 100644 zmq/devices/monitoredqueue.py delete mode 100644 zmq/devices/monitoredqueue.pyx delete mode 100644 zmq/devices/monitoredqueuedevice.py delete mode 100644 zmq/devices/proxydevice.py (limited to 'zmq/devices') diff --git a/zmq/devices/__init__.py b/zmq/devices/__init__.py deleted file mode 100644 index 2371596..0000000 --- a/zmq/devices/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -"""0MQ Device classes for running in background threads or processes.""" - -# Copyright (C) PyZMQ Developers -# Distributed under the terms of the Modified BSD License. - -from zmq import device -from zmq.devices import basedevice, proxydevice, monitoredqueue, monitoredqueuedevice - -from zmq.devices.basedevice import * -from zmq.devices.proxydevice import * -from zmq.devices.monitoredqueue import * -from zmq.devices.monitoredqueuedevice import * - -__all__ = ['device'] -for submod in (basedevice, proxydevice, monitoredqueue, monitoredqueuedevice): - __all__.extend(submod.__all__) diff --git a/zmq/devices/basedevice.py b/zmq/devices/basedevice.py deleted file mode 100644 index 7ba1b7a..0000000 --- a/zmq/devices/basedevice.py +++ /dev/null @@ -1,229 +0,0 @@ -"""Classes for running 0MQ Devices in the background.""" - -# Copyright (C) PyZMQ Developers -# Distributed under the terms of the Modified BSD License. - - -import time -from threading import Thread -from multiprocessing import Process - -from zmq import device, QUEUE, Context, ETERM, ZMQError - - -class Device: - """A 0MQ Device to be run in the background. - - You do not pass Socket instances to this, but rather Socket types:: - - Device(device_type, in_socket_type, out_socket_type) - - For instance:: - - dev = Device(zmq.QUEUE, zmq.DEALER, zmq.ROUTER) - - Similar to zmq.device, but socket types instead of sockets themselves are - passed, and the sockets are created in the work thread, to avoid issues - with thread safety. As a result, additional bind_{in|out} and - connect_{in|out} methods and setsockopt_{in|out} allow users to specify - connections for the sockets. - - Parameters - ---------- - device_type : int - The 0MQ Device type - {in|out}_type : int - zmq socket types, to be passed later to context.socket(). e.g. - zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used - for both in_socket and out_socket. - - Methods - ------- - bind_{in_out}(iface) - passthrough for ``{in|out}_socket.bind(iface)``, to be called in the thread - connect_{in_out}(iface) - passthrough for ``{in|out}_socket.connect(iface)``, to be called in the - thread - setsockopt_{in_out}(opt,value) - passthrough for ``{in|out}_socket.setsockopt(opt, value)``, to be called in - the thread - - Attributes - ---------- - daemon : int - sets whether the thread should be run as a daemon - Default is true, because if it is false, the thread will not - exit unless it is killed - context_factory : callable (class attribute) - Function for creating the Context. This will be Context.instance - in ThreadDevices, and Context in ProcessDevices. The only reason - it is not instance() in ProcessDevices is that there may be a stale - Context instance already initialized, and the forked environment - should *never* try to use it. - """ - - context_factory = Context.instance - """Callable that returns a context. Typically either Context.instance or Context, - depending on whether the device should share the global instance or not. - """ - - def __init__(self, device_type=QUEUE, in_type=None, out_type=None): - self.device_type = device_type - if in_type is None: - raise TypeError("in_type must be specified") - if out_type is None: - raise TypeError("out_type must be specified") - self.in_type = in_type - self.out_type = out_type - self._in_binds = [] - self._in_connects = [] - self._in_sockopts = [] - self._out_binds = [] - self._out_connects = [] - self._out_sockopts = [] - self.daemon = True - self.done = False - - def bind_in(self, addr): - """Enqueue ZMQ address for binding on in_socket. - - See zmq.Socket.bind for details. - """ - self._in_binds.append(addr) - - def connect_in(self, addr): - """Enqueue ZMQ address for connecting on in_socket. - - See zmq.Socket.connect for details. - """ - self._in_connects.append(addr) - - def setsockopt_in(self, opt, value): - """Enqueue setsockopt(opt, value) for in_socket - - See zmq.Socket.setsockopt for details. - """ - self._in_sockopts.append((opt, value)) - - def bind_out(self, addr): - """Enqueue ZMQ address for binding on out_socket. - - See zmq.Socket.bind for details. - """ - self._out_binds.append(addr) - - def connect_out(self, addr): - """Enqueue ZMQ address for connecting on out_socket. - - See zmq.Socket.connect for details. - """ - self._out_connects.append(addr) - - def setsockopt_out(self, opt, value): - """Enqueue setsockopt(opt, value) for out_socket - - See zmq.Socket.setsockopt for details. - """ - self._out_sockopts.append((opt, value)) - - def _setup_sockets(self): - ctx = self.context_factory() - - self._context = ctx - - # create the sockets - ins = ctx.socket(self.in_type) - if self.out_type < 0: - outs = ins - else: - outs = ctx.socket(self.out_type) - - # set sockopts (must be done first, in case of zmq.IDENTITY) - for opt,value in self._in_sockopts: - ins.setsockopt(opt, value) - for opt,value in self._out_sockopts: - outs.setsockopt(opt, value) - - for iface in self._in_binds: - ins.bind(iface) - for iface in self._out_binds: - outs.bind(iface) - - for iface in self._in_connects: - ins.connect(iface) - for iface in self._out_connects: - outs.connect(iface) - - return ins,outs - - def run_device(self): - """The runner method. - - Do not call me directly, instead call ``self.start()``, just like a Thread. - """ - ins,outs = self._setup_sockets() - device(self.device_type, ins, outs) - - def run(self): - """wrap run_device in try/catch ETERM""" - try: - self.run_device() - except ZMQError as e: - if e.errno == ETERM: - # silence TERM errors, because this should be a clean shutdown - pass - else: - raise - finally: - self.done = True - - def start(self): - """Start the device. Override me in subclass for other launchers.""" - return self.run() - - def join(self,timeout=None): - """wait for me to finish, like Thread.join. - - Reimplemented appropriately by subclasses.""" - tic = time.time() - toc = tic - while not self.done and not (timeout is not None and toc-tic > timeout): - time.sleep(.001) - toc = time.time() - - -class BackgroundDevice(Device): - """Base class for launching Devices in background processes and threads.""" - - launcher=None - _launch_class=None - - def start(self): - self.launcher = self._launch_class(target=self.run) - self.launcher.daemon = self.daemon - return self.launcher.start() - - def join(self, timeout=None): - return self.launcher.join(timeout=timeout) - - -class ThreadDevice(BackgroundDevice): - """A Device that will be run in a background Thread. - - See Device for details. - """ - _launch_class=Thread - -class ProcessDevice(BackgroundDevice): - """A Device that will be run in a background Process. - - See Device for details. - """ - _launch_class=Process - context_factory = Context - """Callable that returns a context. Typically either Context.instance or Context, - depending on whether the device should share the global instance or not. - """ - - -__all__ = ['Device', 'ThreadDevice', 'ProcessDevice'] diff --git a/zmq/devices/monitoredqueue.pxd b/zmq/devices/monitoredqueue.pxd deleted file mode 100644 index 1e26ed8..0000000 --- a/zmq/devices/monitoredqueue.pxd +++ /dev/null @@ -1,177 +0,0 @@ -"""MonitoredQueue class declarations. - -Authors -------- -* MinRK -* Brian Granger -""" - -# -# Copyright (c) 2010 Min Ragan-Kelley, Brian Granger -# -# This file is part of pyzmq, but is derived and adapted from zmq_queue.cpp -# originally from libzmq-2.1.6, used under LGPLv3 -# -# pyzmq is free software; you can redistribute it and/or modify it under -# the terms of the Lesser GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# pyzmq is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Lesser GNU General Public License for more details. -# -# You should have received a copy of the Lesser GNU General Public License -# along with this program. If not, see . -# - -#----------------------------------------------------------------------------- -# Imports -#----------------------------------------------------------------------------- - -from libzmq cimport * - -#----------------------------------------------------------------------------- -# MonitoredQueue C functions -#----------------------------------------------------------------------------- - -cdef inline int _relay(void *insocket_, void *outsocket_, void *sidesocket_, - zmq_msg_t msg, zmq_msg_t side_msg, zmq_msg_t id_msg, - bint swap_ids) nogil: - cdef int rc - cdef int64_t flag_2 - cdef int flag_3 - cdef int flags - cdef bint more - cdef size_t flagsz - cdef void * flag_ptr - - if ZMQ_VERSION_MAJOR < 3: - flagsz = sizeof (int64_t) - flag_ptr = &flag_2 - else: - flagsz = sizeof (int) - flag_ptr = &flag_3 - - if swap_ids:# both router, must send second identity first - # recv two ids into msg, id_msg - rc = zmq_msg_recv(&msg, insocket_, 0) - if rc < 0: return rc - - rc = zmq_msg_recv(&id_msg, insocket_, 0) - if rc < 0: return rc - - # send second id (id_msg) first - #!!!! always send a copy before the original !!!! - rc = zmq_msg_copy(&side_msg, &id_msg) - if rc < 0: return rc - rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE) - if rc < 0: return rc - rc = zmq_msg_send(&id_msg, sidesocket_, ZMQ_SNDMORE) - if rc < 0: return rc - # send first id (msg) second - rc = zmq_msg_copy(&side_msg, &msg) - if rc < 0: return rc - rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE) - if rc < 0: return rc - rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE) - if rc < 0: return rc - while (True): - rc = zmq_msg_recv(&msg, insocket_, 0) - if rc < 0: return rc - # assert (rc == 0) - rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, flag_ptr, &flagsz) - if rc < 0: return rc - flags = 0 - if ZMQ_VERSION_MAJOR < 3: - if flag_2: - flags |= ZMQ_SNDMORE - else: - if flag_3: - flags |= ZMQ_SNDMORE - # LABEL has been removed: - # rc = zmq_getsockopt (insocket_, ZMQ_RCVLABEL, flag_ptr, &flagsz) - # if flag_3: - # flags |= ZMQ_SNDLABEL - # assert (rc == 0) - - rc = zmq_msg_copy(&side_msg, &msg) - if rc < 0: return rc - if flags: - rc = zmq_msg_send(&side_msg, outsocket_, flags) - if rc < 0: return rc - # only SNDMORE for side-socket - rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE) - if rc < 0: return rc - else: - rc = zmq_msg_send(&side_msg, outsocket_, 0) - if rc < 0: return rc - rc = zmq_msg_send(&msg, sidesocket_, 0) - if rc < 0: return rc - break - return rc - -# the MonitoredQueue C function, adapted from zmq::queue.cpp : -cdef inline int c_monitored_queue (void *insocket_, void *outsocket_, - void *sidesocket_, zmq_msg_t *in_msg_ptr, - zmq_msg_t *out_msg_ptr, int swap_ids) nogil: - """The actual C function for a monitored queue device. - - See ``monitored_queue()`` for details. - """ - - cdef zmq_msg_t msg - cdef int rc = zmq_msg_init (&msg) - cdef zmq_msg_t id_msg - rc = zmq_msg_init (&id_msg) - if rc < 0: return rc - cdef zmq_msg_t side_msg - rc = zmq_msg_init (&side_msg) - if rc < 0: return rc - - cdef zmq_pollitem_t items [2] - items [0].socket = insocket_ - items [0].fd = 0 - items [0].events = ZMQ_POLLIN - items [0].revents = 0 - items [1].socket = outsocket_ - items [1].fd = 0 - items [1].events = ZMQ_POLLIN - items [1].revents = 0 - # I don't think sidesocket should be polled? - # items [2].socket = sidesocket_ - # items [2].fd = 0 - # items [2].events = ZMQ_POLLIN - # items [2].revents = 0 - - while (True): - - # // Wait while there are either requests or replies to process. - rc = zmq_poll (&items [0], 2, -1) - if rc < 0: return rc - # // The algorithm below asumes ratio of request and replies processed - # // under full load to be 1:1. Although processing requests replies - # // first is tempting it is suspectible to DoS attacks (overloading - # // the system with unsolicited replies). - # - # // Process a request. - if (items [0].revents & ZMQ_POLLIN): - # send in_prefix to side socket - rc = zmq_msg_copy(&side_msg, in_msg_ptr) - if rc < 0: return rc - rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE) - if rc < 0: return rc - # relay the rest of the message - rc = _relay(insocket_, outsocket_, sidesocket_, msg, side_msg, id_msg, swap_ids) - if rc < 0: return rc - if (items [1].revents & ZMQ_POLLIN): - # send out_prefix to side socket - rc = zmq_msg_copy(&side_msg, out_msg_ptr) - if rc < 0: return rc - rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE) - if rc < 0: return rc - # relay the rest of the message - rc = _relay(outsocket_, insocket_, sidesocket_, msg, side_msg, id_msg, swap_ids) - if rc < 0: return rc - return rc diff --git a/zmq/devices/monitoredqueue.py b/zmq/devices/monitoredqueue.py deleted file mode 100644 index c6d9142..0000000 --- a/zmq/devices/monitoredqueue.py +++ /dev/null @@ -1,37 +0,0 @@ -"""pure Python monitored_queue function - -For use when Cython extension is unavailable (PyPy). - -Authors -------- -* MinRK -""" - -# Copyright (C) PyZMQ Developers -# Distributed under the terms of the Modified BSD License. - -import zmq - -def _relay(ins, outs, sides, prefix, swap_ids): - msg = ins.recv_multipart() - if swap_ids: - msg[:2] = msg[:2][::-1] - outs.send_multipart(msg) - sides.send_multipart([prefix] + msg) - -def monitored_queue(in_socket, out_socket, mon_socket, - in_prefix=b'in', out_prefix=b'out'): - - swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER - - poller = zmq.Poller() - poller.register(in_socket, zmq.POLLIN) - poller.register(out_socket, zmq.POLLIN) - while True: - events = dict(poller.poll()) - if in_socket in events: - _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids) - if out_socket in events: - _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids) - -__all__ = ['monitored_queue'] diff --git a/zmq/devices/monitoredqueue.pyx b/zmq/devices/monitoredqueue.pyx deleted file mode 100644 index d5fec64..0000000 --- a/zmq/devices/monitoredqueue.pyx +++ /dev/null @@ -1,103 +0,0 @@ -"""MonitoredQueue classes and functions. - -Authors -------- -* MinRK -* Brian Granger -""" - -#----------------------------------------------------------------------------- -# Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley -# -# This file is part of pyzmq -# -# Distributed under the terms of the New BSD License. The full license is in -# the file COPYING.BSD, distributed as part of this software. -#----------------------------------------------------------------------------- - -#----------------------------------------------------------------------------- -# Imports -#----------------------------------------------------------------------------- - -cdef extern from "Python.h": - ctypedef int Py_ssize_t - -from libc.string cimport memcpy - -from buffers cimport asbuffer_r -from libzmq cimport * - -from zmq.backend.cython.socket cimport Socket -from zmq.backend.cython.checkrc cimport _check_rc - -from zmq import ROUTER, ZMQError - -#----------------------------------------------------------------------------- -# MonitoredQueue functions -#----------------------------------------------------------------------------- - - -def monitored_queue(Socket in_socket, Socket out_socket, Socket mon_socket, - bytes in_prefix=b'in', bytes out_prefix=b'out'): - """monitored_queue(in_socket, out_socket, mon_socket, - in_prefix=b'in', out_prefix=b'out') - - Start a monitored queue device. - - A monitored queue is very similar to the zmq.proxy device (monitored queue came first). - - Differences from zmq.proxy: - - - monitored_queue supports both in and out being ROUTER sockets - (via swapping IDENTITY prefixes). - - monitor messages are prefixed, making in and out messages distinguishable. - - Parameters - ---------- - in_socket : Socket - One of the sockets to the Queue. Its messages will be prefixed with - 'in'. - out_socket : Socket - One of the sockets to the Queue. Its messages will be prefixed with - 'out'. The only difference between in/out socket is this prefix. - mon_socket : Socket - This socket sends out every message received by each of the others - with an in/out prefix specifying which one it was. - in_prefix : str - Prefix added to broadcast messages from in_socket. - out_prefix : str - Prefix added to broadcast messages from out_socket. - """ - - cdef void *ins=in_socket.handle - cdef void *outs=out_socket.handle - cdef void *mons=mon_socket.handle - cdef zmq_msg_t in_msg - cdef zmq_msg_t out_msg - cdef bint swap_ids - cdef char *msg_c = NULL - cdef Py_ssize_t msg_c_len - cdef int rc - - # force swap_ids if both ROUTERs - swap_ids = (in_socket.type == ROUTER and out_socket.type == ROUTER) - - # build zmq_msg objects from str prefixes - asbuffer_r(in_prefix, &msg_c, &msg_c_len) - rc = zmq_msg_init_size(&in_msg, msg_c_len) - _check_rc(rc) - - memcpy(zmq_msg_data(&in_msg), msg_c, zmq_msg_size(&in_msg)) - - asbuffer_r(out_prefix, &msg_c, &msg_c_len) - - rc = zmq_msg_init_size(&out_msg, msg_c_len) - _check_rc(rc) - - with nogil: - memcpy(zmq_msg_data(&out_msg), msg_c, zmq_msg_size(&out_msg)) - rc = c_monitored_queue(ins, outs, mons, &in_msg, &out_msg, swap_ids) - _check_rc(rc) - return rc - -__all__ = ['monitored_queue'] diff --git a/zmq/devices/monitoredqueuedevice.py b/zmq/devices/monitoredqueuedevice.py deleted file mode 100644 index 9723f86..0000000 --- a/zmq/devices/monitoredqueuedevice.py +++ /dev/null @@ -1,66 +0,0 @@ -"""MonitoredQueue classes and functions.""" - -# Copyright (C) PyZMQ Developers -# Distributed under the terms of the Modified BSD License. - - -from zmq import ZMQError, PUB -from zmq.devices.proxydevice import ProxyBase, Proxy, ThreadProxy, ProcessProxy -from zmq.devices.monitoredqueue import monitored_queue - - -class MonitoredQueueBase(ProxyBase): - """Base class for overriding methods.""" - - _in_prefix = b'' - _out_prefix = b'' - - def __init__(self, in_type, out_type, mon_type=PUB, in_prefix=b'in', out_prefix=b'out'): - - ProxyBase.__init__(self, in_type=in_type, out_type=out_type, mon_type=mon_type) - - self._in_prefix = in_prefix - self._out_prefix = out_prefix - - def run_device(self): - ins,outs,mons = self._setup_sockets() - monitored_queue(ins, outs, mons, self._in_prefix, self._out_prefix) - - -class MonitoredQueue(MonitoredQueueBase, Proxy): - """Class for running monitored_queue in the background. - - See zmq.devices.Device for most of the spec. MonitoredQueue differs from Proxy, - only in that it adds a ``prefix`` to messages sent on the monitor socket, - with a different prefix for each direction. - - MQ also supports ROUTER on both sides, which zmq.proxy does not. - - If a message arrives on `in_sock`, it will be prefixed with `in_prefix` on the monitor socket. - If it arrives on out_sock, it will be prefixed with `out_prefix`. - - A PUB socket is the most logical choice for the mon_socket, but it is not required. - """ - pass - - -class ThreadMonitoredQueue(MonitoredQueueBase, ThreadProxy): - """Run zmq.monitored_queue in a background thread. - - See MonitoredQueue and Proxy for details. - """ - pass - - -class ProcessMonitoredQueue(MonitoredQueueBase, ProcessProxy): - """Run zmq.monitored_queue in a background thread. - - See MonitoredQueue and Proxy for details. - """ - - -__all__ = [ - 'MonitoredQueue', - 'ThreadMonitoredQueue', - 'ProcessMonitoredQueue' -] diff --git a/zmq/devices/proxydevice.py b/zmq/devices/proxydevice.py deleted file mode 100644 index 68be3f1..0000000 --- a/zmq/devices/proxydevice.py +++ /dev/null @@ -1,90 +0,0 @@ -"""Proxy classes and functions.""" - -# Copyright (C) PyZMQ Developers -# Distributed under the terms of the Modified BSD License. - -import zmq -from zmq.devices.basedevice import Device, ThreadDevice, ProcessDevice - - -class ProxyBase(object): - """Base class for overriding methods.""" - - def __init__(self, in_type, out_type, mon_type=zmq.PUB): - - Device.__init__(self, in_type=in_type, out_type=out_type) - self.mon_type = mon_type - self._mon_binds = [] - self._mon_connects = [] - self._mon_sockopts = [] - - def bind_mon(self, addr): - """Enqueue ZMQ address for binding on mon_socket. - - See zmq.Socket.bind for details. - """ - self._mon_binds.append(addr) - - def connect_mon(self, addr): - """Enqueue ZMQ address for connecting on mon_socket. - - See zmq.Socket.bind for details. - """ - self._mon_connects.append(addr) - - def setsockopt_mon(self, opt, value): - """Enqueue setsockopt(opt, value) for mon_socket - - See zmq.Socket.setsockopt for details. - """ - self._mon_sockopts.append((opt, value)) - - def _setup_sockets(self): - ins,outs = Device._setup_sockets(self) - ctx = self._context - mons = ctx.socket(self.mon_type) - - # set sockopts (must be done first, in case of zmq.IDENTITY) - for opt,value in self._mon_sockopts: - mons.setsockopt(opt, value) - - for iface in self._mon_binds: - mons.bind(iface) - - for iface in self._mon_connects: - mons.connect(iface) - - return ins,outs,mons - - def run_device(self): - ins,outs,mons = self._setup_sockets() - zmq.proxy(ins, outs, mons) - -class Proxy(ProxyBase, Device): - """Threadsafe Proxy object. - - See zmq.devices.Device for most of the spec. This subclass adds a - _mon version of each _{in|out} method, for configuring the - monitor socket. - - A Proxy is a 3-socket ZMQ Device that functions just like a - QUEUE, except each message is also sent out on the monitor socket. - - A PUB socket is the most logical choice for the mon_socket, but it is not required. - """ - pass - -class ThreadProxy(ProxyBase, ThreadDevice): - """Proxy in a Thread. See Proxy for more.""" - pass - -class ProcessProxy(ProxyBase, ProcessDevice): - """Proxy in a Process. See Proxy for more.""" - pass - - -__all__ = [ - 'Proxy', - 'ThreadProxy', - 'ProcessProxy', -] -- cgit v1.2.3