summaryrefslogtreecommitdiff
path: root/taskthread/tests/unit/test_taskthread.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskthread/tests/unit/test_taskthread.py')
-rw-r--r--taskthread/tests/unit/test_taskthread.py315
1 files changed, 315 insertions, 0 deletions
diff --git a/taskthread/tests/unit/test_taskthread.py b/taskthread/tests/unit/test_taskthread.py
new file mode 100644
index 0000000..8256592
--- /dev/null
+++ b/taskthread/tests/unit/test_taskthread.py
@@ -0,0 +1,315 @@
+# -*- coding: utf-8 -*-
+# Copyright 2013 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.:w
+
+
+import threading
+import unittest2 as unittest
+
+from mock import Mock, patch
+
+from taskthread import TaskThread, TaskInProcessException, TimerTask
+
+forever_event = threading.Event()
+
+
+def forever_function(*args, **kwargs):
+ forever_event.wait()
+ forever_event.clear()
+
+
+class TaskThreadTestCase(unittest.TestCase):
+ """
+ Tests for :py:class:`.TaskThread`.
+ """
+
+ def test___init__(self):
+ """
+ Test the __init__ method. It doesn't really do much.
+ """
+ task_thread = TaskThread(forever_function)
+ self.assertEqual(forever_function, task_thread.task)
+
+ def test_run_not_running(self):
+ """
+ Verifies that thread will shut down when running is false
+ """
+ event = Mock()
+ event.wait = Mock(side_effect=[True])
+ event.clear = Mock(side_effect=Exception("Should never be called"))
+ task_thread = TaskThread(forever_function,
+ event=event)
+ task_thread.running = False
+ task_thread.run()
+ event.wait.assert_called_once_with()
+
+ def test_run_executes_task(self):
+ event = Mock()
+ event.wait = Mock(side_effect=[True, True])
+
+ def stop_iteration(*args, **kwargs):
+ args[0].running = False
+
+ task_thread = TaskThread(stop_iteration,
+ event=event)
+
+ task_thread.args = [task_thread]
+ task_thread.kwargs = {'a': 2}
+ task_thread.in_task = True
+ task_thread.run()
+ self.assertEqual(False, task_thread.in_task)
+
+ def test_run_task(self):
+ event = Mock()
+ task_thread = TaskThread(forever_function,
+ event=event)
+ args = [1]
+ kwargs = {'a': 1}
+
+ task_thread.run_task(*args, **kwargs)
+ self.assertEqual(tuple(args), task_thread.args)
+ self.assertEqual(kwargs, task_thread.kwargs)
+ event.set.assert_called_once_with()
+
+ def test_run_task_task_in_progress(self):
+ event = Mock()
+ task_thread = TaskThread(forever_function,
+ event=event)
+ task_thread.in_task = True
+ self.assertRaises(TaskInProcessException, task_thread.run_task)
+
+ def test_join_task(self):
+ task_thread = TaskThread(forever_function)
+ task_thread.in_task = True
+ task_thread.task_complete = Mock()
+ task_thread.task_complete.wait = Mock(side_effect=[True])
+ success = task_thread.join_task(1)
+ self.assertTrue(success)
+
+ def test_join_task_not_running(self):
+ task_thread = TaskThread(forever_function)
+ task_thread.task_complete = Mock()
+ task_thread.wait =\
+ Mock(side_effect=Exception("Should never be called"))
+ task_thread.join_task(1)
+
+ def test_join(self):
+ task_thread = TaskThread(forever_function)
+ task_thread.start()
+ task_thread.run_task()
+ # Set the event so the task completes
+ forever_event.set()
+ task_thread.join_task(1)
+ task_thread.join(1)
+
+ def test_execute_multiple_tasks(self):
+ task_thread = TaskThread(forever_function)
+ task_thread.start()
+ task_thread.run_task()
+ # Set the event so the task completes
+ forever_event.set()
+ task_thread.join_task(1)
+ forever_event.set()
+ task_thread.join_task(1)
+ task_thread.join(1)
+
+
+def my_func():
+ pass
+
+
+class TimerTaskTestCase(unittest.TestCase):
+
+ def test___int__(self):
+
+ task = TimerTask(my_func,
+ delay=100)
+ self.assertEqual(my_func, task.execute_fcn)
+ self.assertEqual(100, task.delay)
+ self.assertIsNone(task.count_fcn)
+ self.assertIsNone(task.threshold)
+
+ def test___int__raises(self):
+ self.assertRaises(ValueError, TimerTask.__init__,
+ TimerTask(None),
+ my_func(),
+ count_fcn=Mock())
+
+ self.assertRaises(ValueError, TimerTask.__init__,
+ TimerTask(None),
+ my_func(),
+ threshold=Mock())
+
+ @patch('taskthread.TaskThread')
+ def test_start(self, TaskThreadMock):
+ task = TimerTask(my_func)
+ thread = TaskThreadMock.return_value
+
+ task.start()
+ self.assertTrue(task.running)
+ self.assertEqual(thread, task.thread)
+ thread.start.assert_called_once_with()
+ thread.run_task.assert_called_once_with()
+
+ @patch('taskthread.TaskThread')
+ def test_start_restarts(self, TaskThreadMock):
+ task = TimerTask(my_func, threshold=1, count_fcn=Mock())
+ thread = TaskThreadMock.return_value
+ task.last_count = 1
+ task.thread = thread
+
+ task.start()
+ self.assertEqual(0, task.last_count)
+ self.assertEqual(0, thread.start.called)
+ thread.run_task.assert_called_once_with()
+
+ @patch('taskthread.TaskThread')
+ def test_stop(self, TaskThreadMock):
+ running_lock = Mock()
+ running_lock.__enter__ = Mock()
+ running_lock.__exit__ = Mock()
+ task = TimerTask(my_func)
+ task.thread = TaskThreadMock.return_value
+ task.running = True
+ task.event = Mock()
+ task.running_lock = running_lock
+
+ task.stop()
+
+ self.assertEqual(False, task.running)
+ self.assertEqual(1, task.event.set.called)
+ running_lock.__enter__.assert_called_once_with()
+ running_lock.__exit__.assert_called_once_with(None, None, None)
+ task.thread.join_task.assert_called_once_with(2)
+
+ @patch('taskthread.TaskThread')
+ def test_stop_not_running(self, TaskThreadMock):
+ task = TimerTask(my_func)
+ task.thread = TaskThreadMock.return_value
+ task.running = False
+ task.event = Mock()
+
+ task.stop()
+
+ self.assertEqual(False, task.running)
+ self.assertEqual(0, task.event.set.called)
+ self.assertEqual(0, task.thread.join_task.called)
+
+ @patch('taskthread.TaskThread')
+ def test_shutdown(self, TaskThreadMock):
+ task = TimerTask(my_func)
+ task.thread = TaskThreadMock.return_value
+ task.running = False
+ task.shutdown()
+ task.thread.join.assert_called_once_with(2)
+
+ def test__exec_if_threshold_met(self):
+ self.called = False
+
+ def exec_fcn():
+ self.called = True
+
+ def count_fcn():
+ return 10
+
+ task = TimerTask(exec_fcn, count_fcn=count_fcn, threshold=1)
+ task.last_count = 9
+ task._exec_if_threshold_met()
+ self.assertTrue(self.called)
+ self.assertEqual(10, task.last_count)
+
+ def test__exec_if_threshold_met_not_met(self):
+
+ def exec_fcn():
+ raise Exception("This shouldn't happen!!")
+
+ def count_fcn():
+ return 10
+
+ task = TimerTask(exec_fcn, count_fcn=count_fcn, threshold=10)
+ task.last_count = 9
+ task._exec_if_threshold_met()
+ self.assertEqual(9, task.last_count)
+
+ def test__exec(self):
+ self.called = False
+
+ def exec_fcn():
+ self.called = True
+
+ task = TimerTask(exec_fcn)
+ task._exec()
+ self.assertTrue(self.called)
+
+ def test__exec_threshold(self):
+ self.called = False
+
+ def exec_fcn():
+ self.called = True
+
+ def count_fcn():
+ return 1
+
+ task = TimerTask(exec_fcn, count_fcn=count_fcn, threshold=1)
+ task._exec()
+ self.assertTrue(self.called)
+
+ @patch('threading.Event')
+ def test__wait(self, event_mock):
+ task = TimerTask(my_func)
+ event = event_mock.return_value
+
+ task._wait()
+ event.wait.assert_called_once_with(timeout=task.delay)
+ self.assertEqual(1, event.clear.called)
+
+ @patch('threading.RLock')
+ def test__exit_loop(self, mock_rlock):
+ task = TimerTask(my_func)
+ task.running = False
+ lock = mock_rlock.return_value
+ lock.__enter__ = Mock()
+ lock.__exit__ = Mock()
+ self.assertTrue(task._exit_loop())
+ self.assertEqual(1, lock.__enter__.called)
+ lock.__exit__.assert_called_once_with(None, None, None)
+
+ @patch('threading.RLock')
+ def test__exit_loop_running(self, mock_rlock):
+ lock = mock_rlock.return_value
+ lock.__enter__ = Mock()
+ lock.__exit__ = Mock()
+ task = TimerTask(my_func)
+ task.running = True
+ self.assertFalse(task._exit_loop())
+ self.assertEqual(1, lock.__enter__.called)
+ lock.__exit__.assert_called_once_with(None, None, None)
+
+ @patch('threading.RLock')
+ @patch('threading.Event')
+ def test__run_threshold_timer(self, event_mock, rlock_mock):
+ self.task = None
+ event = event_mock.return_value
+ lock = rlock_mock.return_value
+ lock.__enter__ = Mock()
+ lock.__exit__ = Mock()
+
+ def exec_fcn():
+ self.task.running = False
+
+ self.task = TimerTask(exec_fcn)
+ self.task._run_threshold_timer()
+
+ self.assertFalse(self.task.running)
+ self.assertEqual(2, event.wait.call_count)