gva/gnuviechadmin/taskresults/models.py

55 lines
1.7 KiB
Python
Raw Normal View History

"""
This model defines the database models to handle Celery AsyncResults.
"""
from __future__ import unicode_literals
from django.db import models
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext as _
from gnuviechadmin.celery import app
class TaskResultManager(models.Manager):
def create_task_result(self, creator, signature, notes=''):
sigstr = str(signature)
result = signature.apply_async()
taskresult = self.create(
task_id=result.task_id, creator=creator, signature=sigstr,
notes=notes)
return taskresult
@python_2_unicode_compatible
class TaskResult(models.Model):
task_id = models.CharField(_('Task id'), max_length=36)
signature = models.TextField(_('Task signature'))
creator = models.TextField(_('Task creator'))
notes = models.TextField(_('Task notes'))
result = models.TextField(_('Task result'))
finished = models.BooleanField(default=False)
state = models.CharField(_('Task state'), max_length=16)
objects = TaskResultManager()
class Meta:
verbose_name = _('Task result')
verbose_name_plural = _('Task results')
def __str__(self):
return "{creator} ({task_id}): {finished}".format(
creator=self.creator,
task_id=self.task_id,
finished=_('yes') if self.finished else _('no')
)
def fetch_result(self):
if not self.finished:
ar = app.AsyncResult(self.task_id)
self.state = ar.state
if ar.ready():
res = ar.get()
self.result = str(res)
self.finished = True