diff options
Diffstat (limited to 'src/leap')
-rw-r--r-- | src/leap/bitmask/backend/taskthread.py | 296 |
1 files changed, 0 insertions, 296 deletions
diff --git a/src/leap/bitmask/backend/taskthread.py b/src/leap/bitmask/backend/taskthread.py deleted file mode 100644 index a734a829..00000000 --- a/src/leap/bitmask/backend/taskthread.py +++ /dev/null @@ -1,296 +0,0 @@ -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import logging -import threading - -__version__ = '1.4' - - -logger = logging.getLogger(__name__) - - -class TaskInProcessException(BaseException): - pass - - -class TaskThread(threading.Thread): - """ - A thread object that repeats a task. - - Usage example:: - - from taskthread import TaskThread - - import time - - def my_task(*args, **kwargs): - print args, kwargs - - task_thread = TaskThread(my_task) - task_thread.start() - for i in xrange(10): - task_thread.run_task() - task_thread.join_task() - task_thread.join() - - .. note:: If :py:meth:`~TaskThread.run_task` is - invoked while run_task is in progress, - :py:class:`~.TaskInProcessException` will - be raised. - - :param task: - A ``function``. This param is the task to execute when - run_task is called. - :param event: - A ``threading.Event``. This event will be set when run_task - is called. The default value is a new event, but may be - specified for testing purposes. - """ - - daemon = True - ''' - Threads marked as daemon will be terminated. - ''' - def __init__(self, task, event=threading.Event(), - *args, **kwargs): - super(TaskThread, self).__init__() - self.task = task - self.task_event = event - self.running = True - self.running_lock = threading.Lock() - self.in_task = False - self.task_complete = threading.Event() - self.args = args - self.kwargs = kwargs - - def run(self): - """ - Called by threading.Thread, this runs in the new thread. - """ - while True: - self.task_event.wait() - if not self.running: - logger.debug("TaskThread exiting") - return - logger.debug("TaskThread starting task") - with self.running_lock: - self.task_event.clear() - self.task_complete.clear() - self.task(*self.args, **self.kwargs) - with self.running_lock: - self.in_task = False - self.task_complete.set() - - def run_task(self, *args, **kwargs): - """ - Run an instance of the task. - - :param args: - The arguments to pass to the task. - - :param kwargs: - The keyword arguments to pass to the task. - """ - # Don't allow this call if the thread is currently - # in a task. - with self.running_lock: - if self.in_task: - raise TaskInProcessException() - self.in_task = True - logger.debug("Waking up the thread") - self.args = args - self.kwargs = kwargs - # Wake up the thread to do it's thing - self.task_event.set() - - def join_task(self, time_out): - """ - Wait for the currently running task to complete. - - :param time_out: - An ``int``. The amount of time to wait for the - task to finish. - """ - with self.running_lock: - if not self.in_task: - return - - success = self.task_complete.wait(time_out) - if success: - self.task_complete.clear() - return success - - def join(self, timeout=None): - """ - Wait for the task to finish - """ - self.running = False - self.task_event.set() - super(TaskThread, self).join(timeout=timeout) - - -class TimerTask(object): - """ - An object that executes a commit function at a given interval. - This class is driven by a TaskThread. A new TaskThread will be - created the first time :py:meth:`.~start` is called. All - subsequent calls to start will reuse the same thread. - - Usage example:: - - from taskthread import TimerTask - import time - - count = 0 - def get_count(): - return count - def execute(): - print "Count: %d" % count - - task = TimerTask(execute, - timeout=10, - count_fcn=get_count, - threshold=1) - - task.start() - - for i in xrange(100000): - count += 1 - time.sleep(1) - task.stop() - count = 0 - task.start() - for i in xrange(100000): - count += 1 - time.sleep(1) - task.shutdown() - - :param execute_fcn: - A `function`. This function will be executed on each time interval. - - :param delay: - An `int`. The delay in **seconds** invocations of - `execute_fcn`. Default: `10`. - - :param count_fcn: - A `function`. This function returns a current count. If the count - has not changed more the `threshold` since the last invocation of - `execute_fcn`, `execute_fcn` will not be called again. If not - specified, `execute_fcn` will be called each time the timer fires. - **Optional**. If count_fcn is specified, ``threshold`` is required. - - :param threshold: - An `int`. Specifies the minimum delta in `count_fcn` that must be - met for `execute_fcn` to be invoked. **Optional**. Must be - specified in conjunction with `count_fcn`. - - """ - def __init__(self, execute_fcn, delay=10, count_fcn=None, threshold=None): - self.running = True - self.execute_fcn = execute_fcn - self.last_count = 0 - self.event = threading.Event() - self.delay = delay - self.thread = None - self.running_lock = threading.RLock() - if bool(threshold) != bool(count_fcn): - raise ValueError("Must specify threshold " - "and count_fcn, or neither") - - self.count_fcn = count_fcn - self.threshold = threshold - - def start(self): - """ - Start the task. This starts a :py:class:`.~TaskThread`, and starts - running run_threshold_timer on the thread. - - """ - if not self.thread: - logger.debug('Starting up the taskthread') - self.thread = TaskThread(self._run_threshold_timer) - self.thread.start() - - if self.threshold: - self.last_count = 0 - - logger.debug('Running the task') - self.running = True - self.thread.run_task() - - def stop(self): - """ - Stop the task, leaving the :py:class:`.~TaskThread` running - so start can be called again. - - """ - logger.debug('Stopping the task') - wait = False - with self.running_lock: - if self.running: - wait = True - self.running = False - if wait: - self.event.set() - self.thread.join_task(2) - - def shutdown(self): - """ - Close down the task thread and stop the task if it is running. - - """ - logger.debug('Shutting down the task') - self.stop() - self.thread.join(2) - - def _exec_if_threshold_met(self): - new_count = self.count_fcn() - logger.debug('new_count: %d' % new_count) - if new_count >= self.last_count + self.threshold: - self.execute_fcn() - self.last_count = new_count - - def _exec(self): - if self.count_fcn: - self._exec_if_threshold_met() - else: - self.execute_fcn() - - def _wait(self): - self.event.wait(timeout=self.delay) - self.event.clear() - logger.debug('Task woken up') - - def _exit_loop(self): - """ - If self.running is false, it means the task should shut down. - """ - exit_loop = False - with self.running_lock: - if not self.running: - exit_loop = True - logger.debug('Task shutting down') - return exit_loop - - def _run_threshold_timer(self): - """ - Main loop of the timer task - - """ - logger.debug('In Task') - while True: - self._wait() - if self._exit_loop(): - return - self._exec() |