From 44be832c5708baadd146cb954befbc3dcad8d463 Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Tue, 11 Nov 2014 11:52:45 -0500 Subject: prepare for upgrade to new upstream --- zmq/eventloop/__init__.py | 5 - zmq/eventloop/ioloop.py | 193 ------ zmq/eventloop/minitornado/__init__.py | 0 zmq/eventloop/minitornado/concurrent.py | 11 - zmq/eventloop/minitornado/ioloop.py | 829 ------------------------ zmq/eventloop/minitornado/log.py | 6 - zmq/eventloop/minitornado/platform/__init__.py | 0 zmq/eventloop/minitornado/platform/auto.py | 45 -- zmq/eventloop/minitornado/platform/common.py | 91 --- zmq/eventloop/minitornado/platform/interface.py | 63 -- zmq/eventloop/minitornado/platform/posix.py | 70 -- zmq/eventloop/minitornado/platform/windows.py | 20 - zmq/eventloop/minitornado/stack_context.py | 376 ----------- zmq/eventloop/minitornado/util.py | 184 ------ zmq/eventloop/zmqstream.py | 529 --------------- 15 files changed, 2422 deletions(-) delete mode 100644 zmq/eventloop/__init__.py delete mode 100644 zmq/eventloop/ioloop.py delete mode 100644 zmq/eventloop/minitornado/__init__.py delete mode 100644 zmq/eventloop/minitornado/concurrent.py delete mode 100644 zmq/eventloop/minitornado/ioloop.py delete mode 100644 zmq/eventloop/minitornado/log.py delete mode 100644 zmq/eventloop/minitornado/platform/__init__.py delete mode 100644 zmq/eventloop/minitornado/platform/auto.py delete mode 100644 zmq/eventloop/minitornado/platform/common.py delete mode 100644 zmq/eventloop/minitornado/platform/interface.py delete mode 100644 zmq/eventloop/minitornado/platform/posix.py delete mode 100644 zmq/eventloop/minitornado/platform/windows.py delete mode 100644 zmq/eventloop/minitornado/stack_context.py delete mode 100644 zmq/eventloop/minitornado/util.py delete mode 100644 zmq/eventloop/zmqstream.py (limited to 'zmq/eventloop') diff --git a/zmq/eventloop/__init__.py b/zmq/eventloop/__init__.py deleted file mode 100644 index 568e8e8..0000000 --- a/zmq/eventloop/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""A Tornado based event loop for PyZMQ.""" - -from zmq.eventloop.ioloop import IOLoop - -__all__ = ['IOLoop'] \ No newline at end of file diff --git a/zmq/eventloop/ioloop.py b/zmq/eventloop/ioloop.py deleted file mode 100644 index 35f4c41..0000000 --- a/zmq/eventloop/ioloop.py +++ /dev/null @@ -1,193 +0,0 @@ -# coding: utf-8 -"""tornado IOLoop API with zmq compatibility - -If you have tornado ≥ 3.0, this is a subclass of tornado's IOLoop, -otherwise we ship a minimal subset of tornado in zmq.eventloop.minitornado. - -The minimal shipped version of tornado's IOLoop does not include -support for concurrent futures - this will only be available if you -have tornado ≥ 3.0. -""" - -# Copyright (C) PyZMQ Developers -# Distributed under the terms of the Modified BSD License. - -from __future__ import absolute_import, division, with_statement - -import os -import time -import warnings - -from zmq import ( - Poller, - POLLIN, POLLOUT, POLLERR, - ZMQError, ETERM, -) - -try: - import tornado - tornado_version = tornado.version_info -except (ImportError, AttributeError): - tornado_version = () - -try: - # tornado ≥ 3 - from tornado.ioloop import PollIOLoop, PeriodicCallback - from tornado.log import gen_log -except ImportError: - from .minitornado.ioloop import PollIOLoop, PeriodicCallback - from .minitornado.log import gen_log - - -class DelayedCallback(PeriodicCallback): - """Schedules the given callback to be called once. - - The callback is called once, after callback_time milliseconds. - - `start` must be called after the DelayedCallback is created. - - The timeout is calculated from when `start` is called. - """ - def __init__(self, callback, callback_time, io_loop=None): - # PeriodicCallback require callback_time to be positive - warnings.warn("""DelayedCallback is deprecated. - Use loop.add_timeout instead.""", DeprecationWarning) - callback_time = max(callback_time, 1e-3) - super(DelayedCallback, self).__init__(callback, callback_time, io_loop) - - def start(self): - """Starts the timer.""" - self._running = True - self._firstrun = True - self._next_timeout = time.time() + self.callback_time / 1000.0 - self.io_loop.add_timeout(self._next_timeout, self._run) - - def _run(self): - if not self._running: return - self._running = False - try: - self.callback() - except Exception: - gen_log.error("Error in delayed callback", exc_info=True) - - -class ZMQPoller(object): - """A poller that can be used in the tornado IOLoop. - - This simply wraps a regular zmq.Poller, scaling the timeout - by 1000, so that it is in seconds rather than milliseconds. - """ - - def __init__(self): - self._poller = Poller() - - @staticmethod - def _map_events(events): - """translate IOLoop.READ/WRITE/ERROR event masks into zmq.POLLIN/OUT/ERR""" - z_events = 0 - if events & IOLoop.READ: - z_events |= POLLIN - if events & IOLoop.WRITE: - z_events |= POLLOUT - if events & IOLoop.ERROR: - z_events |= POLLERR - return z_events - - @staticmethod - def _remap_events(z_events): - """translate zmq.POLLIN/OUT/ERR event masks into IOLoop.READ/WRITE/ERROR""" - events = 0 - if z_events & POLLIN: - events |= IOLoop.READ - if z_events & POLLOUT: - events |= IOLoop.WRITE - if z_events & POLLERR: - events |= IOLoop.ERROR - return events - - def register(self, fd, events): - return self._poller.register(fd, self._map_events(events)) - - def modify(self, fd, events): - return self._poller.modify(fd, self._map_events(events)) - - def unregister(self, fd): - return self._poller.unregister(fd) - - def poll(self, timeout): - """poll in seconds rather than milliseconds. - - Event masks will be IOLoop.READ/WRITE/ERROR - """ - z_events = self._poller.poll(1000*timeout) - return [ (fd,self._remap_events(evt)) for (fd,evt) in z_events ] - - def close(self): - pass - - -class ZMQIOLoop(PollIOLoop): - """ZMQ subclass of tornado's IOLoop""" - def initialize(self, impl=None, **kwargs): - impl = ZMQPoller() if impl is None else impl - super(ZMQIOLoop, self).initialize(impl=impl, **kwargs) - - @staticmethod - def instance(): - """Returns a global `IOLoop` instance. - - Most applications have a single, global `IOLoop` running on the - main thread. Use this method to get this instance from - another thread. To get the current thread's `IOLoop`, use `current()`. - """ - # install ZMQIOLoop as the active IOLoop implementation - # when using tornado 3 - if tornado_version >= (3,): - PollIOLoop.configure(ZMQIOLoop) - return PollIOLoop.instance() - - def start(self): - try: - super(ZMQIOLoop, self).start() - except ZMQError as e: - if e.errno == ETERM: - # quietly return on ETERM - pass - else: - raise e - - -if tornado_version >= (3,0) and tornado_version < (3,1): - def backport_close(self, all_fds=False): - """backport IOLoop.close to 3.0 from 3.1 (supports fd.close() method)""" - from zmq.eventloop.minitornado.ioloop import PollIOLoop as mini_loop - return mini_loop.close.__get__(self)(all_fds) - ZMQIOLoop.close = backport_close - - -# public API name -IOLoop = ZMQIOLoop - - -def install(): - """set the tornado IOLoop instance with the pyzmq IOLoop. - - After calling this function, tornado's IOLoop.instance() and pyzmq's - IOLoop.instance() will return the same object. - - An assertion error will be raised if tornado's IOLoop has been initialized - prior to calling this function. - """ - from tornado import ioloop - # check if tornado's IOLoop is already initialized to something other - # than the pyzmq IOLoop instance: - assert (not ioloop.IOLoop.initialized()) or \ - ioloop.IOLoop.instance() is IOLoop.instance(), "tornado IOLoop already initialized" - - if tornado_version >= (3,): - # tornado 3 has an official API for registering new defaults, yay! - ioloop.IOLoop.configure(ZMQIOLoop) - else: - # we have to set the global instance explicitly - ioloop.IOLoop._instance = IOLoop.instance() - diff --git a/zmq/eventloop/minitornado/__init__.py b/zmq/eventloop/minitornado/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/zmq/eventloop/minitornado/concurrent.py b/zmq/eventloop/minitornado/concurrent.py deleted file mode 100644 index 519b23d..0000000 --- a/zmq/eventloop/minitornado/concurrent.py +++ /dev/null @@ -1,11 +0,0 @@ -"""pyzmq does not ship tornado's futures, -this just raises informative NotImplementedErrors to avoid having to change too much code. -""" - -class NotImplementedFuture(object): - def __init__(self, *args, **kwargs): - raise NotImplementedError("pyzmq does not ship tornado's Futures, " - "install tornado >= 3.0 for future support." - ) - -Future = TracebackFuture = NotImplementedFuture diff --git a/zmq/eventloop/minitornado/ioloop.py b/zmq/eventloop/minitornado/ioloop.py deleted file mode 100644 index 710a3ec..0000000 --- a/zmq/eventloop/minitornado/ioloop.py +++ /dev/null @@ -1,829 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2009 Facebook -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""An I/O event loop for non-blocking sockets. - -Typical applications will use a single `IOLoop` object, in the -`IOLoop.instance` singleton. The `IOLoop.start` method should usually -be called at the end of the ``main()`` function. Atypical applications may -use more than one `IOLoop`, such as one `IOLoop` per thread, or per `unittest` -case. - -In addition to I/O events, the `IOLoop` can also schedule time-based events. -`IOLoop.add_timeout` is a non-blocking alternative to `time.sleep`. -""" - -from __future__ import absolute_import, division, print_function, with_statement - -import datetime -import errno -import functools -import heapq -import logging -import numbers -import os -import select -import sys -import threading -import time -import traceback - -from .concurrent import Future, TracebackFuture -from .log import app_log, gen_log -from . import stack_context -from .util import Configurable - -try: - import signal -except ImportError: - signal = None - -try: - import thread # py2 -except ImportError: - import _thread as thread # py3 - -from .platform.auto import set_close_exec, Waker - - -class TimeoutError(Exception): - pass - - -class IOLoop(Configurable): - """A level-triggered I/O loop. - - We use ``epoll`` (Linux) or ``kqueue`` (BSD and Mac OS X) if they - are available, or else we fall back on select(). If you are - implementing a system that needs to handle thousands of - simultaneous connections, you should use a system that supports - either ``epoll`` or ``kqueue``. - - Example usage for a simple TCP server:: - - import errno - import functools - import ioloop - import socket - - def connection_ready(sock, fd, events): - while True: - try: - connection, address = sock.accept() - except socket.error, e: - if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN): - raise - return - connection.setblocking(0) - handle_connection(connection, address) - - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.setblocking(0) - sock.bind(("", port)) - sock.listen(128) - - io_loop = ioloop.IOLoop.instance() - callback = functools.partial(connection_ready, sock) - io_loop.add_handler(sock.fileno(), callback, io_loop.READ) - io_loop.start() - - """ - # Constants from the epoll module - _EPOLLIN = 0x001 - _EPOLLPRI = 0x002 - _EPOLLOUT = 0x004 - _EPOLLERR = 0x008 - _EPOLLHUP = 0x010 - _EPOLLRDHUP = 0x2000 - _EPOLLONESHOT = (1 << 30) - _EPOLLET = (1 << 31) - - # Our events map exactly to the epoll events - NONE = 0 - READ = _EPOLLIN - WRITE = _EPOLLOUT - ERROR = _EPOLLERR | _EPOLLHUP - - # Global lock for creating global IOLoop instance - _instance_lock = threading.Lock() - - _current = threading.local() - - @staticmethod - def instance(): - """Returns a global `IOLoop` instance. - - Most applications have a single, global `IOLoop` running on the - main thread. Use this method to get this instance from - another thread. To get the current thread's `IOLoop`, use `current()`. - """ - if not hasattr(IOLoop, "_instance"): - with IOLoop._instance_lock: - if not hasattr(IOLoop, "_instance"): - # New instance after double check - IOLoop._instance = IOLoop() - return IOLoop._instance - - @staticmethod - def initialized(): - """Returns true if the singleton instance has been created.""" - return hasattr(IOLoop, "_instance") - - def install(self): - """Installs this `IOLoop` object as the singleton instance. - - This is normally not necessary as `instance()` will create - an `IOLoop` on demand, but you may want to call `install` to use - a custom subclass of `IOLoop`. - """ - assert not IOLoop.initialized() - IOLoop._instance = self - - @staticmethod - def current(): - """Returns the current thread's `IOLoop`. - - If an `IOLoop` is currently running or has been marked as current - by `make_current`, returns that instance. Otherwise returns - `IOLoop.instance()`, i.e. the main thread's `IOLoop`. - - A common pattern for classes that depend on ``IOLoops`` is to use - a default argument to enable programs with multiple ``IOLoops`` - but not require the argument for simpler applications:: - - class MyClass(object): - def __init__(self, io_loop=None): - self.io_loop = io_loop or IOLoop.current() - - In general you should use `IOLoop.current` as the default when - constructing an asynchronous object, and use `IOLoop.instance` - when you mean to communicate to the main thread from a different - one. - """ - current = getattr(IOLoop._current, "instance", None) - if current is None: - return IOLoop.instance() - return current - - def make_current(self): - """Makes this the `IOLoop` for the current thread. - - An `IOLoop` automatically becomes current for its thread - when it is started, but it is sometimes useful to call - `make_current` explictly before starting the `IOLoop`, - so that code run at startup time can find the right - instance. - """ - IOLoop._current.instance = self - - @staticmethod - def clear_current(): - IOLoop._current.instance = None - - @classmethod - def configurable_base(cls): - return IOLoop - - @classmethod - def configurable_default(cls): - # this is the only patch to IOLoop: - from zmq.eventloop.ioloop import ZMQIOLoop - return ZMQIOLoop - # the remainder of this method is unused, - # but left for preservation reasons - if hasattr(select, "epoll"): - from tornado.platform.epoll import EPollIOLoop - return EPollIOLoop - if hasattr(select, "kqueue"): - # Python 2.6+ on BSD or Mac - from tornado.platform.kqueue import KQueueIOLoop - return KQueueIOLoop - from tornado.platform.select import SelectIOLoop - return SelectIOLoop - - def initialize(self): - pass - - def close(self, all_fds=False): - """Closes the `IOLoop`, freeing any resources used. - - If ``all_fds`` is true, all file descriptors registered on the - IOLoop will be closed (not just the ones created by the - `IOLoop` itself). - - Many applications will only use a single `IOLoop` that runs for the - entire lifetime of the process. In that case closing the `IOLoop` - is not necessary since everything will be cleaned up when the - process exits. `IOLoop.close` is provided mainly for scenarios - such as unit tests, which create and destroy a large number of - ``IOLoops``. - - An `IOLoop` must be completely stopped before it can be closed. This - means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must - be allowed to return before attempting to call `IOLoop.close()`. - Therefore the call to `close` will usually appear just after - the call to `start` rather than near the call to `stop`. - - .. versionchanged:: 3.1 - If the `IOLoop` implementation supports non-integer objects - for "file descriptors", those objects will have their - ``close`` method when ``all_fds`` is true. - """ - raise NotImplementedError() - - def add_handler(self, fd, handler, events): - """Registers the given handler to receive the given events for fd. - - The ``events`` argument is a bitwise or of the constants - ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``. - - When an event occurs, ``handler(fd, events)`` will be run. - """ - raise NotImplementedError() - - def update_handler(self, fd, events): - """Changes the events we listen for fd.""" - raise NotImplementedError() - - def remove_handler(self, fd): - """Stop listening for events on fd.""" - raise NotImplementedError() - - def set_blocking_signal_threshold(self, seconds, action): - """Sends a signal if the `IOLoop` is blocked for more than - ``s`` seconds. - - Pass ``seconds=None`` to disable. Requires Python 2.6 on a unixy - platform. - - The action parameter is a Python signal handler. Read the - documentation for the `signal` module for more information. - If ``action`` is None, the process will be killed if it is - blocked for too long. - """ - raise NotImplementedError() - - def set_blocking_log_threshold(self, seconds): - """Logs a stack trace if the `IOLoop` is blocked for more than - ``s`` seconds. - - Equivalent to ``set_blocking_signal_threshold(seconds, - self.log_stack)`` - """ - self.set_blocking_signal_threshold(seconds, self.log_stack) - - def log_stack(self, signal, frame): - """Signal handler to log the stack trace of the current thread. - - For use with `set_blocking_signal_threshold`. - """ - gen_log.warning('IOLoop blocked for %f seconds in\n%s', - self._blocking_signal_threshold, - ''.join(traceback.format_stack(frame))) - - def start(self): - """Starts the I/O loop. - - The loop will run until one of the callbacks calls `stop()`, which - will make the loop stop after the current event iteration completes. - """ - raise NotImplementedError() - - def stop(self): - """Stop the I/O loop. - - If the event loop is not currently running, the next call to `start()` - will return immediately. - - To use asynchronous methods from otherwise-synchronous code (such as - unit tests), you can start and stop the event loop like this:: - - ioloop = IOLoop() - async_method(ioloop=ioloop, callback=ioloop.stop) - ioloop.start() - - ``ioloop.start()`` will return after ``async_method`` has run - its callback, whether that callback was invoked before or - after ``ioloop.start``. - - Note that even after `stop` has been called, the `IOLoop` is not - completely stopped until `IOLoop.start` has also returned. - Some work that was scheduled before the call to `stop` may still - be run before the `IOLoop` shuts down. - """ - raise NotImplementedError() - - def run_sync(self, func, timeout=None): - """Starts the `IOLoop`, runs the given function, and stops the loop. - - If the function returns a `.Future`, the `IOLoop` will run - until the future is resolved. If it raises an exception, the - `IOLoop` will stop and the exception will be re-raised to the - caller. - - The keyword-only argument ``timeout`` may be used to set - a maximum duration for the function. If the timeout expires, - a `TimeoutError` is raised. - - This method is useful in conjunction with `tornado.gen.coroutine` - to allow asynchronous calls in a ``main()`` function:: - - @gen.coroutine - def main(): - # do stuff... - - if __name__ == '__main__': - IOLoop.instance().run_sync(main) - """ - future_cell = [None] - - def run(): - try: - result = func() - except Exception: - future_cell[0] = TracebackFuture() - future_cell[0].set_exc_info(sys.exc_info()) - else: - if isinstance(result, Future): - future_cell[0] = result - else: - future_cell[0] = Future() - future_cell[0].set_result(result) - self.add_future(future_cell[0], lambda future: self.stop()) - self.add_callback(run) - if timeout is not None: - timeout_handle = self.add_timeout(self.time() + timeout, self.stop) - self.start() - if timeout is not None: - self.remove_timeout(timeout_handle) - if not future_cell[0].done(): - raise TimeoutError('Operation timed out after %s seconds' % timeout) - return future_cell[0].result() - - def time(self): - """Returns the current time according to the `IOLoop`'s clock. - - The return value is a floating-point number relative to an - unspecified time in the past. - - By default, the `IOLoop`'s time function is `time.time`. However, - it may be configured to use e.g. `time.monotonic` instead. - Calls to `add_timeout` that pass a number instead of a - `datetime.timedelta` should use this function to compute the - appropriate time, so they can work no matter what time function - is chosen. - """ - return time.time() - - def add_timeout(self, deadline, callback): - """Runs the ``callback`` at the time ``deadline`` from the I/O loop. - - Returns an opaque handle that may be passed to - `remove_timeout` to cancel. - - ``deadline`` may be a number denoting a time (on the same - scale as `IOLoop.time`, normally `time.time`), or a - `datetime.timedelta` object for a deadline relative to the - current time. - - Note that it is not safe to call `add_timeout` from other threads. - Instead, you must use `add_callback` to transfer control to the - `IOLoop`'s thread, and then call `add_timeout` from there. - """ - raise NotImplementedError() - - def remove_timeout(self, timeout): - """Cancels a pending timeout. - - The argument is a handle as returned by `add_timeout`. It is - safe to call `remove_timeout` even if the callback has already - been run. - """ - raise NotImplementedError() - - def add_callback(self, callback, *args, **kwargs): - """Calls the given callback on the next I/O loop iteration. - - It is safe to call this method from any thread at any time, - except from a signal handler. Note that this is the **only** - method in `IOLoop` that makes this thread-safety guarantee; all - other interaction with the `IOLoop` must be done from that - `IOLoop`'s thread. `add_callback()` may be used to transfer - control from other threads to the `IOLoop`'s thread. - - To add a callback from a signal handler, see - `add_callback_from_signal`. - """ - raise NotImplementedError() - - def add_callback_from_signal(self, callback, *args, **kwargs): - """Calls the given callback on the next I/O loop iteration. - - Safe for use from a Python signal handler; should not be used - otherwise. - - Callbacks added with this method will be run without any - `.stack_context`, to avoid picking up the context of the function - that was interrupted by the signal. - """ - raise NotImplementedError() - - def add_future(self, future, callback): - """Schedules a callback on the ``IOLoop`` when the given - `.Future` is finished. - - The callback is invoked with one argument, the - `.Future`. - """ - assert isinstance(future, Future) - callback = stack_context.wrap(callback) - future.add_done_callback( - lambda future: self.add_callback(callback, future)) - - def _run_callback(self, callback): - """Runs a callback with error handling. - - For use in subclasses. - """ - try: - callback() - except Exception: - self.handle_callback_exception(callback) - - def handle_callback_exception(self, callback): - """This method is called whenever a callback run by the `IOLoop` - throws an exception. - - By default simply logs the exception as an error. Subclasses - may override this method to customize reporting of exceptions. - - The exception itself is not passed explicitly, but is available - in `sys.exc_info`. - """ - app_log.error("Exception in callback %r", callback, exc_info=True) - - -class PollIOLoop(IOLoop): - """Base class for IOLoops built around a select-like function. - - For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` - (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or - `tornado.platform.select.SelectIOLoop` (all platforms). - """ - def initialize(self, impl, time_func=None): - super(PollIOLoop, self).initialize() - self._impl = impl - if hasattr(self._impl, 'fileno'): - set_close_exec(self._impl.fileno()) - self.time_func = time_func or time.time - self._handlers = {} - self._events = {} - self._callbacks = [] - self._callback_lock = threading.Lock() - self._timeouts = [] - self._cancellations = 0 - self._running = False - self._stopped = False - self._closing = False - self._thread_ident = None - self._blocking_signal_threshold = None - - # Create a pipe that we send bogus data to when we want to wake - # the I/O loop when it is idle - self._waker = Waker() - self.add_handler(self._waker.fileno(), - lambda fd, events: self._waker.consume(), - self.READ) - - def close(self, all_fds=False): - with self._callback_lock: - self._closing = True - self.remove_handler(self._waker.fileno()) - if all_fds: - for fd in self._handlers.keys(): - try: - close_method = getattr(fd, 'close', None) - if close_method is not None: - close_method() - else: - os.close(fd) - except Exception: - gen_log.debug("error closing fd %s", fd, exc_info=True) - self._waker.close() - self._impl.close() - - def add_handler(self, fd, handler, events): - self._handlers[fd] = stack_context.wrap(handler) - self._impl.register(fd, events | self.ERROR) - - def update_handler(self, fd, events): - self._impl.modify(fd, events | self.ERROR) - - def remove_handler(self, fd): - self._handlers.pop(fd, None) - self._events.pop(fd, None) - try: - self._impl.unregister(fd) - except Exception: - gen_log.debug("Error deleting fd from IOLoop", exc_info=True) - - def set_blocking_signal_threshold(self, seconds, action): - if not hasattr(signal, "setitimer"): - gen_log.error("set_blocking_signal_threshold requires a signal module " - "with the setitimer method") - return - self._blocking_signal_threshold = seconds - if seconds is not None: - signal.signal(signal.SIGALRM, - action if action is not None else signal.SIG_DFL) - - def start(self): - if not logging.getLogger().handlers: - # The IOLoop catches and logs exceptions, so it's - # important that log output be visible. However, python's - # default behavior for non-root loggers (prior to python - # 3.2) is to print an unhelpful "no handlers could be - # found" message rather than the actual log entry, so we - # must explicitly configure logging if we've made it this - # far without anything. - logging.basicConfig() - if self._stopped: - self._stopped = False - return - old_current = getattr(IOLoop._current, "instance", None) - IOLoop._current.instance = self - self._thread_ident = thread.get_ident() - self._running = True - - # signal.set_wakeup_fd closes a race condition in event loops: - # a signal may arrive at the beginning of select/poll/etc - # before it goes into its interruptible sleep, so the signal - # will be consumed without waking the select. The solution is - # for the (C, synchronous) signal handler to write to a pipe, - # which will then be seen by select. - # - # In python's signal handling semantics, this only matters on the - # main thread (fortunately, set_wakeup_fd only works on the main - # thread and will raise a ValueError otherwise). - # - # If someone has already set a wakeup fd, we don't want to - # disturb it. This is an issue for twisted, which does its - # SIGCHILD processing in response to its own wakeup fd being - # written to. As long as the wakeup fd is registered on the IOLoop, - # the loop will still wake up and everything should work. - old_wakeup_fd = None - if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': - # requires python 2.6+, unix. set_wakeup_fd exists but crashes - # the python process on windows. - try: - old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) - if old_wakeup_fd != -1: - # Already set, restore previous value. This is a little racy, - # but there's no clean get_wakeup_fd and in real use the - # IOLoop is just started once at the beginning. - signal.set_wakeup_fd(old_wakeup_fd) - old_wakeup_fd = None - except ValueError: # non-main thread - pass - - while True: - poll_timeout = 3600.0 - - # Prevent IO event starvation by delaying new callbacks - # to the next iteration of the event loop. - with self._callback_lock: - callbacks = self._callbacks - self._callbacks = [] - for callback in callbacks: - self._run_callback(callback) - - if self._timeouts: - now = self.time() - while self._timeouts: - if self._timeouts[0].callback is None: - # the timeout was cancelled - heapq.heappop(self._timeouts) - self._cancellations -= 1 - elif self._timeouts[0].deadline <= now: - timeout = heapq.heappop(self._timeouts) - self._run_callback(timeout.callback) - else: - seconds = self._timeouts[0].deadline - now - poll_timeout = min(seconds, poll_timeout) - break - if (self._cancellations > 512 - and self._cancellations > (len(self._timeouts) >> 1)): - # Clean up the timeout queue when it gets large and it's - # more than half cancellations. - self._cancellations = 0 - self._timeouts = [x for x in self._timeouts - if x.callback is not None] - heapq.heapify(self._timeouts) - - if self._callbacks: - # If any callbacks or timeouts called add_callback, - # we don't want to wait in poll() before we run them. - poll_timeout = 0.0 - - if not self._running: - break - - if self._blocking_signal_threshold is not None: - # clear alarm so it doesn't fire while poll is waiting for - # events. - signal.setitimer(signal.ITIMER_REAL, 0, 0) - - try: - event_pairs = self._impl.poll(poll_timeout) - except Exception as e: - # Depending on python version and IOLoop implementation, - # different exception types may be thrown and there are - # two ways EINTR might be signaled: - # * e.errno == errno.EINTR - # * e.args is like (errno.EINTR, 'Interrupted system call') - if (getattr(e, 'errno', None) == errno.EINTR or - (isinstance(getattr(e, 'args', None), tuple) and - len(e.args) == 2 and e.args[0] == errno.EINTR)): - continue - else: - raise - - if self._blocking_signal_threshold is not None: - signal.setitimer(signal.ITIMER_REAL, - self._blocking_signal_threshold, 0) - - # Pop one fd at a time from the set of pending fds and run - # its handler. Since that handler may perform actions on - # other file descriptors, there may be reentrant calls to - # this IOLoop that update self._events - self._events.update(event_pairs) - while self._events: - fd, events = self._events.popitem() - try: - self._handlers[fd](fd, events) - except (OSError, IOError) as e: - if e.args[0] == errno.EPIPE: - # Happens when the client closes the connection - pass - else: - app_log.error("Exception in I/O handler for fd %s", - fd, exc_info=True) - except Exception: - app_log.error("Exception in I/O handler for fd %s", - fd, exc_info=True) - # reset the stopped flag so another start/stop pair can be issued - self._stopped = False - if self._blocking_signal_threshold is not None: - signal.setitimer(signal.ITIMER_REAL, 0, 0) - IOLoop._current.instance = old_current - if old_wakeup_fd is not None: - signal.set_wakeup_fd(old_wakeup_fd) - - def stop(self): - self._running = False - self._stopped = True - self._waker.wake() - - def time(self): - return self.time_func() - - def add_timeout(self, deadline, callback): - timeout = _Timeout(deadline, stack_context.wrap(callback), self) - heapq.heappush(self._timeouts, timeout) - return timeout - - def remove_timeout(self, timeout): - # Removing from a heap is complicated, so just leave the defunct - # timeout object in the queue (see discussion in - # http://docs.python.org/library/heapq.html). - # If this turns out to be a problem, we could add a garbage - # collection pass whenever there are too many dead timeouts. - timeout.callback = None - self._cancellations += 1 - - def add_callback(self, callback, *args, **kwargs): - with self._callback_lock: - if self._closing: - raise RuntimeError("IOLoop is closing") - list_empty = not self._callbacks - self._callbacks.append(functools.partial( - stack_context.wrap(callback), *args, **kwargs)) - if list_empty and thread.get_ident() != self._thread_ident: - # If we're in the IOLoop's thread, we know it's not currently - # polling. If we're not, and we added the first callback to an - # empty list, we may need to wake it up (it may wake up on its - # own, but an occasional extra wake is harmless). Waking - # up a polling IOLoop is relatively expensive, so we try to - # avoid it when we can. - self._waker.wake() - - def add_callback_from_signal(self, callback, *args, **kwargs): - with stack_context.NullContext(): - if thread.get_ident() != self._thread_ident: - # if the signal is handled on another thread, we can add - # it normally (modulo the NullContext) - self.add_callback(callback, *args, **kwargs) - else: - # If we're on the IOLoop's thread, we cannot use - # the regular add_callback because it may deadlock on - # _callback_lock. Blindly insert into self._callbacks. - # This is safe because the GIL makes list.append atomic. - # One subtlety is that if the signal interrupted the - # _callback_lock block in IOLoop.start, we may modify - # either the old or new version of self._callbacks, - # but either way will work. - self._callbacks.append(functools.partial( - stack_context.wrap(callback), *args, **kwargs)) - - -class _Timeout(object): - """An IOLoop timeout, a UNIX timestamp and a callback""" - - # Reduce memory overhead when there are lots of pending callbacks - __slots__ = ['deadline', 'callback'] - - def __init__(self, deadline, callback, io_loop): - if isinstance(deadline, numbers.Real): - self.deadline = deadline - elif isinstance(deadline, datetime.timedelta): - self.deadline = io_loop.time() + _Timeout.timedelta_to_seconds(deadline) - else: - raise TypeError("Unsupported deadline %r" % deadline) - self.callback = callback - - @staticmethod - def timedelta_to_seconds(td): - """Equivalent to td.total_seconds() (introduced in python 2.7).""" - return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) - - # Comparison methods to sort by deadline, with object id as a tiebreaker - # to guarantee a consistent ordering. The heapq module uses __le__ - # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons - # use __lt__). - def __lt__(self, other): - return ((self.deadline, id(self)) < - (other.deadline, id(other))) - - def __le__(self, other): - return ((self.deadline, id(self)) <= - (other.deadline, id(other))) - - -class PeriodicCallback(object): - """Schedules the given callback to be called periodically. - - The callback is called every ``callback_time`` milliseconds. - - `start` must be called after the `PeriodicCallback` is created. - """ - def __init__(self, callback, callback_time, io_loop=None): - self.callback = callback - if callback_time <= 0: - raise ValueError("Periodic callback must have a positive callback_time") - self.callback_time = callback_time - self.io_loop = io_loop or IOLoop.current() - self._running = False - self._timeout = None - - def start(self): - """Starts the timer.""" - self._running = True - self._next_timeout = self.io_loop.time() - self._schedule_next() - - def stop(self): - """Stops the timer.""" - self._running = False - if self._timeout is not None: - self.io_loop.remove_timeout(self._timeout) - self._timeout = None - - def _run(self): - if not self._running: - return - try: - self.callback() - except Exception: - app_log.error("Error in periodic callback", exc_info=True) - self._schedule_next() - - def _schedule_next(self): - if self._running: - current_time = self.io_loop.time() - while self._next_timeout <= current_time: - self._next_timeout += self.callback_time / 1000.0 - self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) diff --git a/zmq/eventloop/minitornado/log.py b/zmq/eventloop/minitornado/log.py deleted file mode 100644 index 49051e8..0000000 --- a/zmq/eventloop/minitornado/log.py +++ /dev/null @@ -1,6 +0,0 @@ -"""minimal subset of tornado.log for zmq.eventloop.minitornado""" - -import logging - -app_log = logging.getLogger("tornado.application") -gen_log = logging.getLogger("tornado.general") diff --git a/zmq/eventloop/minitornado/platform/__init__.py b/zmq/eventloop/minitornado/platform/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/zmq/eventloop/minitornado/platform/auto.py b/zmq/eventloop/minitornado/platform/auto.py deleted file mode 100644 index b40ccd9..0000000 --- a/zmq/eventloop/minitornado/platform/auto.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2011 Facebook -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Implementation of platform-specific functionality. - -For each function or class described in `tornado.platform.interface`, -the appropriate platform-specific implementation exists in this module. -Most code that needs access to this functionality should do e.g.:: - - from tornado.platform.auto import set_close_exec -""" - -from __future__ import absolute_import, division, print_function, with_statement - -import os - -if os.name == 'nt': - from .common import Waker - from .windows import set_close_exec -else: - from .posix import set_close_exec, Waker - -try: - # monotime monkey-patches the time module to have a monotonic function - # in versions of python before 3.3. - import monotime -except ImportError: - pass -try: - from time import monotonic as monotonic_time -except ImportError: - monotonic_time = None diff --git a/zmq/eventloop/minitornado/platform/common.py b/zmq/eventloop/minitornado/platform/common.py deleted file mode 100644 index 2d75dc1..0000000 --- a/zmq/eventloop/minitornado/platform/common.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Lowest-common-denominator implementations of platform functionality.""" -from __future__ import absolute_import, division, print_function, with_statement - -import errno -import socket - -from . import interface - - -class Waker(interface.Waker): - """Create an OS independent asynchronous pipe. - - For use on platforms that don't have os.pipe() (or where pipes cannot - be passed to select()), but do have sockets. This includes Windows - and Jython. - """ - def __init__(self): - # Based on Zope async.py: http://svn.zope.org/zc.ngi/trunk/src/zc/ngi/async.py - - self.writer = socket.socket() - # Disable buffering -- pulling the trigger sends 1 byte, - # and we want that sent immediately, to wake up ASAP. - self.writer.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - - count = 0 - while 1: - count += 1 - # Bind to a local port; for efficiency, let the OS pick - # a free port for us. - # Unfortunately, stress tests showed that we may not - # be able to connect to that port ("Address already in - # use") despite that the OS picked it. This appears - # to be a race bug in the Windows socket implementation. - # So we loop until a connect() succeeds (almost always - # on the first try). See the long thread at - # http://mail.zope.org/pipermail/zope/2005-July/160433.html - # for hideous details. - a = socket.socket() - a.bind(("127.0.0.1", 0)) - a.listen(1) - connect_address = a.getsockname() # assigned (host, port) pair - try: - self.writer.connect(connect_address) - break # success - except socket.error as detail: - if (not hasattr(errno, 'WSAEADDRINUSE') or - detail[0] != errno.WSAEADDRINUSE): - # "Address already in use" is the only error - # I've seen on two WinXP Pro SP2 boxes, under - # Pythons 2.3.5 and 2.4.1. - raise - # (10048, 'Address already in use') - # assert count <= 2 # never triggered in Tim's tests - if count >= 10: # I've never seen it go above 2 - a.close() - self.writer.close() - raise socket.error("Cannot bind trigger!") - # Close `a` and try again. Note: I originally put a short - # sleep() here, but it didn't appear to help or hurt. - a.close() - - self.reader, addr = a.accept() - self.reader.setblocking(0) - self.writer.setblocking(0) - a.close() - self.reader_fd = self.reader.fileno() - - def fileno(self): - return self.reader.fileno() - - def write_fileno(self): - return self.writer.fileno() - - def wake(self): - try: - self.writer.send(b"x") - except (IOError, socket.error): - pass - - def consume(self): - try: - while True: - result = self.reader.recv(1024) - if not result: - break - except (IOError, socket.error): - pass - - def close(self): - self.reader.close() - self.writer.close() diff --git a/zmq/eventloop/minitornado/platform/interface.py b/zmq/eventloop/minitornado/platform/interface.py deleted file mode 100644 index 07da6ba..0000000 --- a/zmq/eventloop/minitornado/platform/interface.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2011 Facebook -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Interfaces for platform-specific functionality. - -This module exists primarily for documentation purposes and as base classes -for other tornado.platform modules. Most code should import the appropriate -implementation from `tornado.platform.auto`. -""" - -from __future__ import absolute_import, division, print_function, with_statement - - -def set_close_exec(fd): - """Sets the close-on-exec bit (``FD_CLOEXEC``)for a file descriptor.""" - raise NotImplementedError() - - -class Waker(object): - """A socket-like object that can wake another thread from ``select()``. - - The `~tornado.ioloop.IOLoop` will add the Waker's `fileno()` to - its ``select`` (or ``epoll`` or ``kqueue``) calls. When another - thread wants to wake up the loop, it calls `wake`. Once it has woken - up, it will call `consume` to do any necessary per-wake cleanup. When - the ``IOLoop`` is closed, it closes its waker too. - """ - def fileno(self): - """Returns the read file descriptor for this waker. - - Must be suitable for use with ``select()`` or equivalent on the - local platform. - """ - raise NotImplementedError() - - def write_fileno(self): - """Returns the write file descriptor for this waker.""" - raise NotImplementedError() - - def wake(self): - """Triggers activity on the waker's file descriptor.""" - raise NotImplementedError() - - def consume(self): - """Called after the listen has woken up to do any necessary cleanup.""" - raise NotImplementedError() - - def close(self): - """Closes the waker's file descriptor(s).""" - raise NotImplementedError() diff --git a/zmq/eventloop/minitornado/platform/posix.py b/zmq/eventloop/minitornado/platform/posix.py deleted file mode 100644 index ccffbb6..0000000 --- a/zmq/eventloop/minitornado/platform/posix.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2011 Facebook -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Posix implementations of platform-specific functionality.""" - -from __future__ import absolute_import, division, print_function, with_statement - -import fcntl -import os - -from . import interface - - -def set_close_exec(fd): - flags = fcntl.fcntl(fd, fcntl.F_GETFD) - fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) - - -def _set_nonblocking(fd): - flags = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - -class Waker(interface.Waker): - def __init__(self): - r, w = os.pipe() - _set_nonblocking(r) - _set_nonblocking(w) - set_close_exec(r) - set_close_exec(w) - self.reader = os.fdopen(r, "rb", 0) - self.writer = os.fdopen(w, "wb", 0) - - def fileno(self): - return self.reader.fileno() - - def write_fileno(self): - return self.writer.fileno() - - def wake(self): - try: - self.writer.write(b"x") - except IOError: - pass - - def consume(self): - try: - while True: - result = self.reader.read() - if not result: - break - except IOError: - pass - - def close(self): - self.reader.close() - self.writer.close() diff --git a/zmq/eventloop/minitornado/platform/windows.py b/zmq/eventloop/minitornado/platform/windows.py deleted file mode 100644 index 817bdca..0000000 --- a/zmq/eventloop/minitornado/platform/windows.py +++ /dev/null @@ -1,20 +0,0 @@ -# NOTE: win32 support is currently experimental, and not recommended -# for production use. - - -from __future__ import absolute_import, division, print_function, with_statement -import ctypes -import ctypes.wintypes - -# See: http://msdn.microsoft.com/en-us/library/ms724935(VS.85).aspx -SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation -SetHandleInformation.argtypes = (ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD) -SetHandleInformation.restype = ctypes.wintypes.BOOL - -HANDLE_FLAG_INHERIT = 0x00000001 - - -def set_close_exec(fd): - success = SetHandleInformation(fd, HANDLE_FLAG_INHERIT, 0) - if not success: - raise ctypes.GetLastError() diff --git a/zmq/eventloop/minitornado/stack_context.py b/zmq/eventloop/minitornado/stack_context.py deleted file mode 100644 index 226d804..0000000 --- a/zmq/eventloop/minitornado/stack_context.py +++ /dev/null @@ -1,376 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2010 Facebook -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""`StackContext` allows applications to maintain threadlocal-like state -that follows execution as it moves to other execution contexts. - -The motivating examples are to eliminate the need for explicit -``async_callback`` wrappers (as in `tornado.web.RequestHandler`), and to -allow some additional context to be kept for logging. - -This is slightly magic, but it's an extension of the idea that an -exception handler is a kind of stack-local state and when that stack -is suspended and resumed in a new context that state needs to be -preserved. `StackContext` shifts the burden of restoring that state -from each call site (e.g. wrapping each `.AsyncHTTPClient` callback -in ``async_callback``) to the mechanisms that transfer control from -one context to another (e.g. `.AsyncHTTPClient` itself, `.IOLoop`, -thread pools, etc). - -Example usage:: - - @contextlib.contextmanager - def die_on_error(): - try: - yield - except Exception: - logging.error("exception in asynchronous operation",exc_info=True) - sys.exit(1) - - with StackContext(die_on_error): - # Any exception thrown here *or in callback and its desendents* - # will cause the process to exit instead of spinning endlessly - # in the ioloop. - http_client.fetch(url, callback) - ioloop.start() - -Most applications shouln't have to work with `StackContext` directly. -Here are a few rules of thumb for when it's necessary: - -* If you're writing an asynchronous library that doesn't rely on a - stack_context-aware library like `tornado.ioloop` or `tornado.iostream` - (for example, if you're writing a thread pool), use - `.stack_context.wrap()` before any asynchronous operations to capture the - stack context from where the operation was started. - -* If you're writing an asynchronous library that has some shared - resources (such as a connection pool), create those shared resources - within a ``with stack_context.NullContext():`` block. This will prevent - ``StackContexts`` from leaking from one request to another. - -* If you want to write something like an exception handler that will - persist across asynchronous calls, create a new `StackContext` (or - `ExceptionStackContext`), and make your asynchronous calls in a ``with`` - block that references your `StackContext`. -""" - -from __future__ import absolute_import, division, print_function, with_statement - -import sys -import threading - -from .util import raise_exc_info - - -class StackContextInconsistentError(Exception): - pass - - -class _State(threading.local): - def __init__(self): - self.contexts = (tuple(), None) -_state = _State() - - -class StackContext(object): - """Establishes the given context as a StackContext that will be transferred. - - Note that the parameter is a callable that returns a context - manager, not the context itself. That is, where for a - non-transferable context manager you would say:: - - with my_context(): - - StackContext takes the function itself rather than its result:: - - with StackContext(my_context): - - The result of ``with StackContext() as cb:`` is a deactivation - callback. Run this callback when the StackContext is no longer - needed to ensure that it is not propagated any further (note that - deactivating a context does not affect any instances of that - context that are currently pending). This is an advanced feature - and not necessary in most applications. - """ - def __init__(self, context_factory): - self.context_factory = context_factory - self.contexts = [] - self.active = True - - def _deactivate(self): - self.active = False - - # StackContext protocol - def enter(self): - context = self.context_factory() - self.contexts.append(context) - context.__enter__() - - def exit(self, type, value, traceback): - context = self.contexts.pop() - context.__exit__(type, value, traceback) - - # Note that some of this code is duplicated in ExceptionStackContext - # below. ExceptionStackContext is more common and doesn't need - # the full generality of this class. - def __enter__(self): - self.old_contexts = _state.contexts - self.new_contexts = (self.old_contexts[0] + (self,), self) - _state.contexts = self.new_contexts - - try: - self.enter() - except: - _state.contexts = self.old_contexts - raise - - return self._deactivate - - def __exit__(self, type, value, traceback): - try: - self.exit(type, value, traceback) - finally: - final_contexts = _state.contexts - _state.contexts = self.old_contexts - - # Generator coroutines and with-statements with non-local - # effects interact badly. Check here for signs of - # the stack getting out of sync. - # Note that this check comes after restoring _state.context - # so that if it fails things are left in a (relatively) - # consistent state. - if final_contexts is not self.new_contexts: - raise StackContextInconsistentError( - 'stack_context inconsistency (may be caused by yield ' - 'within a "with StackContext" block)') - - # Break up a reference to itself to allow for faster GC on CPython. - self.new_contexts = None - - -class ExceptionStackContext(object): - """Specialization of StackContext for exception handling. - - The supplied ``exception_handler`` function will be called in the - event of an uncaught exception in this context. The semantics are - similar to a try/finally clause, and intended use cases are to log - an error, close a socket, or similar cleanup actions. The - ``exc_info`` triple ``(type, value, traceback)`` will be passed to the - exception_handler function. - - If the exception handler returns true, the exception will be - consumed and will not be propagated to other exception handlers. - """ - def __init__(self, exception_handler): - self.exception_handler = exception_handler - self.active = True - - def _deactivate(self): - self.active = False - - def exit(self, type, value, traceback): - if type is not None: - return self.exception_handler(type, value, traceback) - - def __enter__(self): - self.old_contexts = _state.contexts - self.new_contexts = (self.old_contexts[0], self) - _state.contexts = self.new_contexts - - return self._deactivate - - def __exit__(self, type, value, traceback): - try: - if type is not None: - return self.exception_handler(type, value, traceback) - finally: - final_contexts = _state.contexts - _state.contexts = self.old_contexts - - if final_contexts is not self.new_contexts: - raise StackContextInconsistentError( - 'stack_context inconsistency (may be caused by yield ' - 'within a "with StackContext" block)') - - # Break up a reference to itself to allow for faster GC on CPython. - self.new_contexts = None - - -class NullContext(object): - """Resets the `StackContext`. - - Useful when creating a shared resource on demand (e.g. an - `.AsyncHTTPClient`) where the stack that caused the creating is - not relevant to future operations. - """ - def __enter__(self): - self.old_contexts = _state.contexts - _state.contexts = (tuple(), None) - - def __exit__(self, type, value, traceback): - _state.contexts = self.old_contexts - - -def _remove_deactivated(contexts): - """Remove deactivated handlers from the chain""" - # Clean ctx handlers - stack_contexts = tuple([h for h in contexts[0] if h.active]) - - # Find new head - head = contexts[1] - while head is not None and not head.active: - head = head.old_contexts[1] - - # Process chain - ctx = head - while ctx is not None: - parent = ctx.old_contexts[1] - - while parent is not None: - if parent.active: - break - ctx.old_contexts = parent.old_contexts - parent = parent.old_contexts[1] - - ctx = parent - - return (stack_contexts, head) - - -def wrap(fn): - """Returns a callable object that will restore the current `StackContext` - when executed. - - Use this whenever saving a callback to be executed later in a - different execution context (either in a different thread or - asynchronously in the same thread). - """ - # Check if function is already wrapped - if fn is None or hasattr(fn, '_wrapped'): - return fn - - # Capture current stack head - # TODO: Any other better way to store contexts and update them in wrapped function? - cap_contexts = [_state.contexts] - - def wrapped(*args, **kwargs): - ret = None - try: - # Capture old state - current_state = _state.contexts - - # Remove deactivated items - cap_contexts[0] = contexts = _remove_deactivated(cap_contexts[0]) - - # Force new state - _state.contexts = contexts - - # Current exception - exc = (None, None, None) - top = None - - # Apply stack contexts - last_ctx = 0 - stack = contexts[0] - - # Apply state - for n in stack: - try: - n.enter() - last_ctx += 1 - except: - # Exception happened. Record exception info and store top-most handler - exc = sys.exc_info() - top = n.old_contexts[1] - - # Execute callback if no exception happened while restoring state - if top is None: - try: - ret = fn(*args, **kwargs) - except: - exc = sys.exc_info() - top = contexts[1] - - # If there was exception, try to handle it by going through the exception chain - if top is not None: - exc = _handle_exception(top, exc) - else: - # Otherwise take shorter path and run stack contexts in reverse order - while last_ctx > 0: - last_ctx -= 1 - c = stack[last_ctx] - - try: - c.exit(*exc) - except: - exc = sys.exc_info() - top = c.old_contexts[1] - break - else: - top = None - - # If if exception happened while unrolling, take longer exception handler path - if top is not None: - exc = _handle_exception(top, exc) - - # If exception was not handled, raise it - if exc != (None, None, None): - raise_exc_info(exc) - finally: - _state.contexts = current_state - return ret - - wrapped._wrapped = True - return wrapped - - -def _handle_exception(tail, exc): - while tail is not None: - try: - if tail.exit(*exc): - exc = (None, None, None) - except: - exc = sys.exc_info() - - tail = tail.old_contexts[1] - - return exc - - -def run_with_stack_context(context, func): - """Run a coroutine ``func`` in the given `StackContext`. - - It is not safe to have a ``yield`` statement within a ``with StackContext`` - block, so it is difficult to use stack context with `.gen.coroutine`. - This helper function runs the function in the correct context while - keeping the ``yield`` and ``with`` statements syntactically separate. - - Example:: - - @gen.coroutine - def incorrect(): - with StackContext(ctx): - # ERROR: this will raise StackContextInconsistentError - yield other_coroutine() - - @gen.coroutine - def correct(): - yield run_with_stack_context(StackContext(ctx), other_coroutine) - - .. versionadded:: 3.1 - """ - with context: - return func() diff --git a/zmq/eventloop/minitornado/util.py b/zmq/eventloop/minitornado/util.py deleted file mode 100644 index c1e2eb9..0000000 --- a/zmq/eventloop/minitornado/util.py +++ /dev/null @@ -1,184 +0,0 @@ -"""Miscellaneous utility functions and classes. - -This module is used internally by Tornado. It is not necessarily expected -that the functions and classes defined here will be useful to other -applications, but they are documented here in case they are. - -The one public-facing part of this module is the `Configurable` class -and its `~Configurable.configure` method, which becomes a part of the -interface of its subclasses, including `.AsyncHTTPClient`, `.IOLoop`, -and `.Resolver`. -""" - -from __future__ import absolute_import, division, print_function, with_statement - -import sys - - -def import_object(name): - """Imports an object by name. - - import_object('x') is equivalent to 'import x'. - import_object('x.y.z') is equivalent to 'from x.y import z'. - - >>> import tornado.escape - >>> import_object('tornado.escape') is tornado.escape - True - >>> import_object('tornado.escape.utf8') is tornado.escape.utf8 - True - >>> import_object('tornado') is tornado - True - >>> import_object('tornado.missing_module') - Traceback (most recent call last): - ... - ImportError: No module named missing_module - """ - if name.count('.') == 0: - return __import__(name, None, None) - - parts = name.split('.') - obj = __import__('.'.join(parts[:-1]), None, None, [parts[-1]], 0) - try: - return getattr(obj, parts[-1]) - except AttributeError: - raise ImportError("No module named %s" % parts[-1]) - - -# Fake unicode literal support: Python 3.2 doesn't have the u'' marker for -# literal strings, and alternative solutions like "from __future__ import -# unicode_literals" have other problems (see PEP 414). u() can be applied -# to ascii strings that include \u escapes (but they must not contain -# literal non-ascii characters). -if type('') is not type(b''): - def u(s): - return s - bytes_type = bytes - unicode_type = str - basestring_type = str -else: - def u(s): - return s.decode('unicode_escape') - bytes_type = str - unicode_type = unicode - basestring_type = basestring - - -if sys.version_info > (3,): - exec(""" -def raise_exc_info(exc_info): - raise exc_info[1].with_traceback(exc_info[2]) - -def exec_in(code, glob, loc=None): - if isinstance(code, str): - code = compile(code, '', 'exec', dont_inherit=True) - exec(code, glob, loc) -""") -else: - exec(""" -def raise_exc_info(exc_info): - raise exc_info[0], exc_info[1], exc_info[2] - -def exec_in(code, glob, loc=None): - if isinstance(code, basestring): - # exec(string) inherits the caller's future imports; compile - # the string first to prevent that. - code = compile(code, '', 'exec', dont_inherit=True) - exec code in glob, loc -""") - - -class Configurable(object): - """Base class for configurable interfaces. - - A configurable interface is an (abstract) class whose constructor - acts as a factory function for one of its implementation subclasses. - The implementation subclass as well as optional keyword arguments to - its initializer can be set globally at runtime with `configure`. - - By using the constructor as the factory method, the interface - looks like a normal class, `isinstance` works as usual, etc. This - pattern is most useful when the choice of implementation is likely - to be a global decision (e.g. when `~select.epoll` is available, - always use it instead of `~select.select`), or when a - previously-monolithic class has been split into specialized - subclasses. - - Configurable subclasses must define the class methods - `configurable_base` and `configurable_default`, and use the instance - method `initialize` instead of ``__init__``. - """ - __impl_class = None - __impl_kwargs = None - - def __new__(cls, **kwargs): - base = cls.configurable_base() - args = {} - if cls is base: - impl = cls.configured_class() - if base.__impl_kwargs: - args.update(base.__impl_kwargs) - else: - impl = cls - args.update(kwargs) - instance = super(Configurable, cls).__new__(impl) - # initialize vs __init__ chosen for compatiblity with AsyncHTTPClient - # singleton magic. If we get rid of that we can switch to __init__ - # here too. - instance.initialize(**args) - return instance - - @classmethod - def configurable_base(cls): - """Returns the base class of a configurable hierarchy. - - This will normally return the class in which it is defined. - (which is *not* necessarily the same as the cls classmethod parameter). - """ - raise NotImplementedError() - - @classmethod - def configurable_default(cls): - """Returns the implementation class to be used if none is configured.""" - raise NotImplementedError() - - def initialize(self): - """Initialize a `Configurable` subclass instance. - - Configurable classes should use `initialize` instead of ``__init__``. - """ - - @classmethod - def configure(cls, impl, **kwargs): - """Sets the class to use when the base class is instantiated. - - Keyword arguments will be saved and added to the arguments passed - to the constructor. This can be used to set global defaults for - some parameters. - """ - base = cls.configurable_base() - if isinstance(impl, (unicode_type, bytes_type)): - impl = import_object(impl) - if impl is not None and not issubclass(impl, cls): - raise ValueError("Invalid subclass of %s" % cls) - base.__impl_class = impl - base.__impl_kwargs = kwargs - - @classmethod - def configured_class(cls): - """Returns the currently configured class.""" - base = cls.configurable_base() - if cls.__impl_class is None: - base.__impl_class = cls.configurable_default() - return base.__impl_class - - @classmethod - def _save_configuration(cls): - base = cls.configurable_base() - return (base.__impl_class, base.__impl_kwargs) - - @classmethod - def _restore_configuration(cls, saved): - base = cls.configurable_base() - base.__impl_class = saved[0] - base.__impl_kwargs = saved[1] - diff --git a/zmq/eventloop/zmqstream.py b/zmq/eventloop/zmqstream.py deleted file mode 100644 index 86a97e4..0000000 --- a/zmq/eventloop/zmqstream.py +++ /dev/null @@ -1,529 +0,0 @@ -# -# Copyright 2009 Facebook -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""A utility class to send to and recv from a non-blocking socket.""" - -from __future__ import with_statement - -import sys - -import zmq -from zmq.utils import jsonapi - -try: - import cPickle as pickle -except ImportError: - import pickle - -from .ioloop import IOLoop - -try: - # gen_log will only import from >= 3.0 - from tornado.log import gen_log - from tornado import stack_context -except ImportError: - from .minitornado.log import gen_log - from .minitornado import stack_context - -try: - from queue import Queue -except ImportError: - from Queue import Queue - -from zmq.utils.strtypes import bytes, unicode, basestring - -try: - callable -except NameError: - callable = lambda obj: hasattr(obj, '__call__') - - -class ZMQStream(object): - """A utility class to register callbacks when a zmq socket sends and receives - - For use with zmq.eventloop.ioloop - - There are three main methods - - Methods: - - * **on_recv(callback, copy=True):** - register a callback to be run every time the socket has something to receive - * **on_send(callback):** - register a callback to be run every time you call send - * **send(self, msg, flags=0, copy=False, callback=None):** - perform a send that will trigger the callback - if callback is passed, on_send is also called. - - There are also send_multipart(), send_json(), send_pyobj() - - Three other methods for deactivating the callbacks: - - * **stop_on_recv():** - turn off the recv callback - * **stop_on_send():** - turn off the send callback - - which simply call ``on_(None)``. - - The entire socket interface, excluding direct recv methods, is also - provided, primarily through direct-linking the methods. - e.g. - - >>> stream.bind is stream.socket.bind - True - - """ - - socket = None - io_loop = None - poller = None - - def __init__(self, socket, io_loop=None): - self.socket = socket - self.io_loop = io_loop or IOLoop.instance() - self.poller = zmq.Poller() - - self._send_queue = Queue() - self._recv_callback = None - self._send_callback = None - self._close_callback = None - self._recv_copy = False - self._flushed = False - - self._state = self.io_loop.ERROR - self._init_io_state() - - # shortcircuit some socket methods - self.bind = self.socket.bind - self.bind_to_random_port = self.socket.bind_to_random_port - self.connect = self.socket.connect - self.setsockopt = self.socket.setsockopt - self.getsockopt = self.socket.getsockopt - self.setsockopt_string = self.socket.setsockopt_string - self.getsockopt_string = self.socket.getsockopt_string - self.setsockopt_unicode = self.socket.setsockopt_unicode - self.getsockopt_unicode = self.socket.getsockopt_unicode - - - def stop_on_recv(self): - """Disable callback and automatic receiving.""" - return self.on_recv(None) - - def stop_on_send(self): - """Disable callback on sending.""" - return self.on_send(None) - - def stop_on_err(self): - """DEPRECATED, does nothing""" - gen_log.warn("on_err does nothing, and will be removed") - - def on_err(self, callback): - """DEPRECATED, does nothing""" - gen_log.warn("on_err does nothing, and will be removed") - - def on_recv(self, callback, copy=True): - """Register a callback for when a message is ready to recv. - - There can be only one callback registered at a time, so each - call to `on_recv` replaces previously registered callbacks. - - on_recv(None) disables recv event polling. - - Use on_recv_stream(callback) instead, to register a callback that will receive - both this ZMQStream and the message, instead of just the message. - - Parameters - ---------- - - callback : callable - callback must take exactly one argument, which will be a - list, as returned by socket.recv_multipart() - if callback is None, recv callbacks are disabled. - copy : bool - copy is passed directly to recv, so if copy is False, - callback will receive Message objects. If copy is True, - then callback will receive bytes/str objects. - - Returns : None - """ - - self._check_closed() - assert callback is None or callable(callback) - self._recv_callback = stack_context.wrap(callback) - self._recv_copy = copy - if callback is None: - self._drop_io_state(self.io_loop.READ) - else: - self._add_io_state(self.io_loop.READ) - - def on_recv_stream(self, callback, copy=True): - """Same as on_recv, but callback will get this stream as first argument - - callback must take exactly two arguments, as it will be called as:: - - callback(stream, msg) - - Useful when a single callback should be used with multiple streams. - """ - if callback is None: - self.stop_on_recv() - else: - self.on_recv(lambda msg: callback(self, msg), copy=copy) - - def on_send(self, callback): - """Register a callback to be called on each send - - There will be two arguments:: - - callback(msg, status) - - * `msg` will be the list of sendable objects that was just sent - * `status` will be the return result of socket.send_multipart(msg) - - MessageTracker or None. - - Non-copying sends return a MessageTracker object whose - `done` attribute will be True when the send is complete. - This allows users to track when an object is safe to write to - again. - - The second argument will always be None if copy=True - on the send. - - Use on_send_stream(callback) to register a callback that will be passed - this ZMQStream as the first argument, in addition to the other two. - - on_send(None) disables recv event polling. - - Parameters - ---------- - - callback : callable - callback must take exactly two arguments, which will be - the message being sent (always a list), - and the return result of socket.send_multipart(msg) - - MessageTracker or None. - - if callback is None, send callbacks are disabled. - """ - - self._check_closed() - assert callback is None or callable(callback) - self._send_callback = stack_context.wrap(callback) - - - def on_send_stream(self, callback): - """Same as on_send, but callback will get this stream as first argument - - Callback will be passed three arguments:: - - callback(stream, msg, status) - - Useful when a single callback should be used with multiple streams. - """ - if callback is None: - self.stop_on_send() - else: - self.on_send(lambda msg, status: callback(self, msg, status)) - - - def send(self, msg, flags=0, copy=True, track=False, callback=None): - """Send a message, optionally also register a new callback for sends. - See zmq.socket.send for details. - """ - return self.send_multipart([msg], flags=flags, copy=copy, track=track, callback=callback) - - def send_multipart(self, msg, flags=0, copy=True, track=False, callback=None): - """Send a multipart message, optionally also register a new callback for sends. - See zmq.socket.send_multipart for details. - """ - kwargs = dict(flags=flags, copy=copy, track=track) - self._send_queue.put((msg, kwargs)) - callback = callback or self._send_callback - if callback is not None: - self.on_send(callback) - else: - # noop callback - self.on_send(lambda *args: None) - self._add_io_state(self.io_loop.WRITE) - - def send_string(self, u, flags=0, encoding='utf-8', callback=None): - """Send a unicode message with an encoding. - See zmq.socket.send_unicode for details. - """ - if not isinstance(u, basestring): - raise TypeError("unicode/str objects only") - return self.send(u.encode(encoding), flags=flags, callback=callback) - - send_unicode = send_string - - def send_json(self, obj, flags=0, callback=None): - """Send json-serialized version of an object. - See zmq.socket.send_json for details. - """ - if jsonapi is None: - raise ImportError('jsonlib{1,2}, json or simplejson library is required.') - else: - msg = jsonapi.dumps(obj) - return self.send(msg, flags=flags, callback=callback) - - def send_pyobj(self, obj, flags=0, protocol=-1, callback=None): - """Send a Python object as a message using pickle to serialize. - - See zmq.socket.send_json for details. - """ - msg = pickle.dumps(obj, protocol) - return self.send(msg, flags, callback=callback) - - def _finish_flush(self): - """callback for unsetting _flushed flag.""" - self._flushed = False - - def flush(self, flag=zmq.POLLIN|zmq.POLLOUT, limit=None): - """Flush pending messages. - - This method safely handles all pending incoming and/or outgoing messages, - bypassing the inner loop, passing them to the registered callbacks. - - A limit can be specified, to prevent blocking under high load. - - flush will return the first time ANY of these conditions are met: - * No more events matching the flag are pending. - * the total number of events handled reaches the limit. - - Note that if ``flag|POLLIN != 0``, recv events will be flushed even if no callback - is registered, unlike normal IOLoop operation. This allows flush to be - used to remove *and ignore* incoming messages. - - Parameters - ---------- - flag : int, default=POLLIN|POLLOUT - 0MQ poll flags. - If flag|POLLIN, recv events will be flushed. - If flag|POLLOUT, send events will be flushed. - Both flags can be set at once, which is the default. - limit : None or int, optional - The maximum number of messages to send or receive. - Both send and recv count against this limit. - - Returns - ------- - int : count of events handled (both send and recv) - """ - self._check_closed() - # unset self._flushed, so callbacks will execute, in case flush has - # already been called this iteration - already_flushed = self._flushed - self._flushed = False - # initialize counters - count = 0 - def update_flag(): - """Update the poll flag, to prevent registering POLLOUT events - if we don't have pending sends.""" - return flag & zmq.POLLIN | (self.sending() and flag & zmq.POLLOUT) - flag = update_flag() - if not flag: - # nothing to do - return 0 - self.poller.register(self.socket, flag) - events = self.poller.poll(0) - while events and (not limit or count < limit): - s,event = events[0] - if event & zmq.POLLIN: # receiving - self._handle_recv() - count += 1 - if self.socket is None: - # break if socket was closed during callback - break - if event & zmq.POLLOUT and self.sending(): - self._handle_send() - count += 1 - if self.socket is None: - # break if socket was closed during callback - break - - flag = update_flag() - if flag: - self.poller.register(self.socket, flag) - events = self.poller.poll(0) - else: - events = [] - if count: # only bypass loop if we actually flushed something - # skip send/recv callbacks this iteration - self._flushed = True - # reregister them at the end of the loop - if not already_flushed: # don't need to do it again - self.io_loop.add_callback(self._finish_flush) - elif already_flushed: - self._flushed = True - - # update ioloop poll state, which may have changed - self._rebuild_io_state() - return count - - def set_close_callback(self, callback): - """Call the given callback when the stream is closed.""" - self._close_callback = stack_context.wrap(callback) - - def close(self, linger=None): - """Close this stream.""" - if self.socket is not None: - self.io_loop.remove_handler(self.socket) - self.socket.close(linger) - self.socket = None - if self._close_callback: - self._run_callback(self._close_callback) - - def receiving(self): - """Returns True if we are currently receiving from the stream.""" - return self._recv_callback is not None - - def sending(self): - """Returns True if we are currently sending to the stream.""" - return not self._send_queue.empty() - - def closed(self): - return self.socket is None - - def _run_callback(self, callback, *args, **kwargs): - """Wrap running callbacks in try/except to allow us to - close our socket.""" - try: - # Use a NullContext to ensure that all StackContexts are run - # inside our blanket exception handler rather than outside. - with stack_context.NullContext(): - callback(*args, **kwargs) - except: - gen_log.error("Uncaught exception, closing connection.", - exc_info=True) - # Close the socket on an uncaught exception from a user callback - # (It would eventually get closed when the socket object is - # gc'd, but we don't want to rely on gc happening before we - # run out of file descriptors) - self.close() - # Re-raise the exception so that IOLoop.handle_callback_exception - # can see it and log the error - raise - - def _handle_events(self, fd, events): - """This method is the actual handler for IOLoop, that gets called whenever - an event on my socket is posted. It dispatches to _handle_recv, etc.""" - # print "handling events" - if not self.socket: - gen_log.warning("Got events for closed stream %s", fd) - return - try: - # dispatch events: - if events & IOLoop.ERROR: - gen_log.error("got POLLERR event on ZMQStream, which doesn't make sense") - return - if events & IOLoop.READ: - self._handle_recv() - if not self.socket: - return - if events & IOLoop.WRITE: - self._handle_send() - if not self.socket: - return - - # rebuild the poll state - self._rebuild_io_state() - except: - gen_log.error("Uncaught exception, closing connection.", - exc_info=True) - self.close() - raise - - def _handle_recv(self): - """Handle a recv event.""" - if self._flushed: - return - try: - msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy) - except zmq.ZMQError as e: - if e.errno == zmq.EAGAIN: - # state changed since poll event - pass - else: - gen_log.error("RECV Error: %s"%zmq.strerror(e.errno)) - else: - if self._recv_callback: - callback = self._recv_callback - # self._recv_callback = None - self._run_callback(callback, msg) - - # self.update_state() - - - def _handle_send(self): - """Handle a send event.""" - if self._flushed: - return - if not self.sending(): - gen_log.error("Shouldn't have handled a send event") - return - - msg, kwargs = self._send_queue.get() - try: - status = self.socket.send_multipart(msg, **kwargs) - except zmq.ZMQError as e: - gen_log.error("SEND Error: %s", e) - status = e - if self._send_callback: - callback = self._send_callback - self._run_callback(callback, msg, status) - - # self.update_state() - - def _check_closed(self): - if not self.socket: - raise IOError("Stream is closed") - - def _rebuild_io_state(self): - """rebuild io state based on self.sending() and receiving()""" - if self.socket is None: - return - state = self.io_loop.ERROR - if self.receiving(): - state |= self.io_loop.READ - if self.sending(): - state |= self.io_loop.WRITE - if state != self._state: - self._state = state - self._update_handler(state) - - def _add_io_state(self, state): - """Add io_state to poller.""" - if not self._state & state: - self._state = self._state | state - self._update_handler(self._state) - - def _drop_io_state(self, state): - """Stop poller from watching an io_state.""" - if self._state & state: - self._state = self._state & (~state) - self._update_handler(self._state) - - def _update_handler(self, state): - """Update IOLoop handler with state.""" - if self.socket is None: - return - self.io_loop.update_handler(self.socket, state) - - def _init_io_state(self): - """initialize the ioloop event handler""" - with stack_context.NullContext(): - self.io_loop.add_handler(self.socket, self._handle_events, self._state) - -- cgit v1.2.3