Initial debianization
[python_taskthread.git] / taskthread / __init__.py
1 # Copyright 2013 Hewlett-Packard Development Company, L.P.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 import logging
15 import threading
16
17 __version__ = '1.4'
18
19
20 logger = logging.getLogger(__name__)
21
22
23 class TaskInProcessException(BaseException):
24     pass
25
26
27 class TaskThread(threading.Thread):
28     """
29     A thread object that repeats a task.
30
31     Usage example::
32
33         from taskthread import TaskThread
34
35         import time
36
37         def my_task(*args, **kwargs):
38             print args, kwargs
39
40         task_thread = TaskThread(my_task)
41         task_thread.start()
42         for i in xrange(10):
43             task_thread.run_task()
44             task_thread.join_task()
45         task_thread.join()
46
47     .. note:: If :py:meth:`~TaskThread.run_task` is
48         invoked while run_task is in progress,
49         :py:class:`~.TaskInProcessException` will
50         be raised.
51
52     :param task:
53         A ``function``. This param is the task to execute when
54          run_task is called.
55     :param event:
56         A ``threading.Event``. This event will be set when run_task
57          is called. The default value is a new event, but may be
58          specified for testing purposes.
59     """
60
61     daemon = True
62     '''
63     Threads marked as daemon will be terminated.
64     '''
65     def __init__(self, task, event=threading.Event(),
66                  *args, **kwargs):
67         super(TaskThread, self).__init__()
68         self.task = task
69         self.task_event = event
70         self.running = True
71         self.running_lock = threading.Lock()
72         self.in_task = False
73         self.task_complete = threading.Event()
74         self.args = args
75         self.kwargs = kwargs
76
77     def run(self):
78         """
79         Called by threading.Thread, this runs in the new thread.
80         """
81         while True:
82             self.task_event.wait()
83             if not self.running:
84                 logger.debug("TaskThread exiting")
85                 return
86             logger.debug("TaskThread starting task")
87             with self.running_lock:
88                 self.task_event.clear()
89             self.task_complete.clear()
90             self.task(*self.args, **self.kwargs)
91             with self.running_lock:
92                 self.in_task = False
93             self.task_complete.set()
94
95     def run_task(self, *args, **kwargs):
96         """
97         Run an instance of the task.
98
99         :param args:
100             The arguments to pass to the task.
101
102         :param kwargs:
103             The keyword arguments to pass to the task.
104         """
105         # Don't allow this call if the thread is currently
106         # in a task.
107         with self.running_lock:
108             if self.in_task:
109                 raise TaskInProcessException()
110             self.in_task = True
111         logger.debug("Waking up the thread")
112         self.args = args
113         self.kwargs = kwargs
114         # Wake up the thread to do it's thing
115         self.task_event.set()
116
117     def join_task(self, time_out):
118         """
119         Wait for the currently running task to complete.
120
121         :param time_out:
122             An ``int``. The amount of time to wait for the
123             task to finish.
124         """
125         with self.running_lock:
126             if not self.in_task:
127                 return
128
129         success = self.task_complete.wait(time_out)
130         if success:
131             self.task_complete.clear()
132         return success
133
134     def join(self, timeout=None):
135         """
136         Wait for the task to finish
137         """
138         self.running = False
139         self.task_event.set()
140         super(TaskThread, self).join(timeout=timeout)
141
142
143 class TimerTask(object):
144     """
145     An object that executes a commit function at a given interval.
146     This class is driven by a TaskThread. A new TaskThread will be
147     created the first time :py:meth:`.~start` is called. All
148     subsequent calls to start will reuse the same thread.
149
150     Usage example::
151
152         from taskthread import TimerTask
153         import time
154
155         count = 0
156         def get_count():
157             return count
158         def execute():
159             print "Count: %d" % count
160
161         task = TimerTask(execute,
162                          timeout=10,
163                          count_fcn=get_count,
164                          threshold=1)
165
166         task.start()
167
168         for i in xrange(100000):
169             count += 1
170             time.sleep(1)
171         task.stop()
172         count = 0
173         task.start()
174         for i in xrange(100000):
175             count += 1
176             time.sleep(1)
177         task.shutdown()
178
179     :param execute_fcn:
180         A `function`. This function will be executed on each time interval.
181
182     :param delay:
183         An `int`. The delay in **seconds** invocations of
184         `execute_fcn`. Default: `10`.
185
186     :param count_fcn:
187         A `function`. This function returns a current count. If the count
188         has not changed more the `threshold` since the last invocation of
189         `execute_fcn`, `execute_fcn` will not be called again. If not
190         specified, `execute_fcn` will be called each time the timer fires.
191         **Optional**. If count_fcn is specified, ``threshold`` is required.
192
193     :param threshold:
194         An `int`. Specifies the minimum delta in `count_fcn` that must be
195         met for `execute_fcn` to be invoked. **Optional**. Must be
196         specified in conjunction with `count_fcn`.
197
198     """
199     def __init__(self, execute_fcn, delay=10, count_fcn=None, threshold=None):
200         self.running = True
201         self.execute_fcn = execute_fcn
202         self.last_count = 0
203         self.event = threading.Event()
204         self.delay = delay
205         self.thread = None
206         self.running_lock = threading.RLock()
207         if bool(threshold) != bool(count_fcn):
208             raise ValueError("Must specify threshold "
209                              "and count_fcn, or neither")
210
211         self.count_fcn = count_fcn
212         self.threshold = threshold
213
214     def start(self):
215         """
216         Start the task. This starts a :py:class:`.~TaskThread`, and starts
217         running run_threshold_timer on the thread.
218
219         """
220         if not self.thread:
221             logger.debug('Starting up the taskthread')
222             self.thread = TaskThread(self._run_threshold_timer)
223             self.thread.start()
224
225         if self.threshold:
226             self.last_count = 0
227
228         logger.debug('Running the task')
229         self.running = True
230         self.thread.run_task()
231
232     def stop(self):
233         """
234         Stop the task, leaving the :py:class:`.~TaskThread` running
235         so start can be called again.
236
237         """
238         logger.debug('Stopping the task')
239         wait = False
240         with self.running_lock:
241             if self.running:
242                 wait = True
243                 self.running = False
244         if wait:
245             self.event.set()
246             self.thread.join_task(2)
247
248     def shutdown(self):
249         """
250         Close down the task thread and stop the task if it is running.
251
252         """
253         logger.debug('Shutting down the task')
254         self.stop()
255         self.thread.join(2)
256
257     def _exec_if_threshold_met(self):
258         new_count = self.count_fcn()
259         logger.debug('new_count: %d' % new_count)
260         if new_count >= self.last_count + self.threshold:
261             self.execute_fcn()
262             self.last_count = new_count
263
264     def _exec(self):
265         if self.count_fcn:
266             self._exec_if_threshold_met()
267         else:
268             self.execute_fcn()
269
270     def _wait(self):
271         self.event.wait(timeout=self.delay)
272         self.event.clear()
273         logger.debug('Task woken up')
274
275     def _exit_loop(self):
276         """
277         If self.running is false, it means the task should shut down.
278         """
279         exit_loop = False
280         with self.running_lock:
281             if not self.running:
282                 exit_loop = True
283                 logger.debug('Task shutting down')
284         return exit_loop
285
286     def _run_threshold_timer(self):
287         """
288         Main loop of the timer task
289
290         """
291         logger.debug('In Task')
292         while True:
293             self._wait()
294             if self._exit_loop():
295                 return
296             self._exec()