gva/gnuviechadmin/taskresults/tests/test_models.py

55 lines
1.9 KiB
Python
Raw Normal View History

2015-02-01 03:00:14 +01:00
from __future__ import absolute_import, unicode_literals
from django.test import TestCase
from mock import patch, MagicMock
from taskresults.models import TaskResult
TEST_TASK_UUID = '3120f6a8-2665-4fa3-a785-79efd28bfe92'
TEST_TASK_NAME = 'test.task'
TEST_TASK_RESULT = '4ll y0ur b453 4r3 b3l0ng t0 u5'
class TaskResultTest(TestCase):
@patch('taskresults.models.app')
def test_update_taskstatus_unfinished(self, app):
mock = MagicMock(id=TEST_TASK_UUID, task_name=TEST_TASK_NAME)
mock.ready.return_value = False
tr = TaskResult.objects.create_task_result(mock, TEST_TASK_NAME)
self.assertFalse(tr.finished)
mymock = app.AsyncResult(TEST_TASK_UUID)
mymock.state = 'SUCCESS'
mymock.get.return_value = TEST_RESULT
tr.fetch_result()
mymock.get.assert_called_with(no_ack=True, timeout=1)
self.assertTrue(tr.finished)
@patch('taskresults.models.app')
def test_update_taskstatus_finished(self, app):
mock = MagicMock(id=TEST_TASK_UUID, task_name=TEST_TASK_NAME)
mock.ready.return_value = True
mock.state = 'SUCCESS'
mock.result = TEST_RESULT
tr = TaskResult.objects.create_task_result(mock, TEST_TASK_NAME)
tr.fetch_result()
self.assertTrue(tr.finished)
mymock = app.AsyncResult(TEST_TASK_UUID)
tr.fetch_result()
self.assertEqual(mymock.get.call_count, 1)
self.assertTrue(tr.finished)
TEST_RESULT = MagicMock()
TEST_RESULT.task_id = TEST_TASK_UUID
TEST_RESULT.task_name = TEST_TASK_NAME
TEST_RESULT.ready.return_value = False
class TaskResultManagerTest(TestCase):
def test_create_task_result(self):
mock = MagicMock(id=TEST_TASK_UUID)
tr = TaskResult.objects.create_task_result(mock, TEST_TASK_NAME)
self.assertIsInstance(tr, TaskResult)
self.assertEqual(tr.task_id, TEST_TASK_UUID)
self.assertEqual(tr.task_name, TEST_TASK_NAME)