gva/gnuviechadmin/taskresults/models.py

53 lines
1.7 KiB
Python

"""
This model defines the database models to handle Celery AsyncResults.
"""
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext as _
from model_utils.models import TimeStampedModel
from gnuviechadmin.celery import app
class TaskResultManager(models.Manager):
def create_task_result(self, creator, signature, notes=""):
sigstr = str(signature)
result = signature.apply_async()
taskresult = self.create(
task_id=result.task_id, creator=creator, signature=sigstr, notes=notes
)
return taskresult
class TaskResult(TimeStampedModel):
task_id = models.CharField(_("Task id"), max_length=36)
signature = models.TextField(_("Task signature"))
creator = models.TextField(_("Task creator"))
notes = models.TextField(_("Task notes"))
result = models.TextField(_("Task result"))
finished = models.BooleanField(default=False)
state = models.CharField(_("Task state"), max_length=16)
objects = TaskResultManager()
class Meta:
verbose_name = _("Task result")
verbose_name_plural = _("Task results")
ordering = ["-modified"]
def __str__(self):
return "{creator} ({task_id}): {finished}".format(
creator=self.creator,
task_id=self.task_id,
finished=_("yes") if self.finished else _("no"),
)
def fetch_result(self):
if not self.finished:
ar = app.AsyncResult(self.task_id)
self.state = ar.state
if ar.ready():
res = ar.get(propagate=False, timeout=5)
self.result = str(res)
self.finished = True