gva/gnuviechadmin/taskresults/models.py

54 lines
1.7 KiB
Python
Raw Permalink Normal View History

"""
This model defines the database models to handle Celery AsyncResults.
"""
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext as _
from model_utils.models import TimeStampedModel
from gnuviechadmin.celery import app
class TaskResultManager(models.Manager):
def create_task_result(self, creator, signature, notes=""):
sigstr = str(signature)
result = signature.apply_async()
taskresult = self.create(
task_id=result.task_id, creator=creator, signature=sigstr, notes=notes
)
return taskresult
2016-01-09 15:26:52 +01:00
class TaskResult(TimeStampedModel):
task_id = models.CharField(_("Task id"), max_length=36)
signature = models.TextField(_("Task signature"))
creator = models.TextField(_("Task creator"))
notes = models.TextField(_("Task notes"))
result = models.TextField(_("Task result"))
finished = models.BooleanField(default=False)
state = models.CharField(_("Task state"), max_length=16)
objects = TaskResultManager()
class Meta:
verbose_name = _("Task result")
verbose_name_plural = _("Task results")
ordering = ["-modified"]
def __str__(self):
return "{creator} ({task_id}): {finished}".format(
creator=self.creator,
task_id=self.task_id,
finished=_("yes") if self.finished else _("no"),
)
def fetch_result(self):
if not self.finished:
ar = app.AsyncResult(self.task_id)
self.state = ar.state
if ar.ready():
2023-05-07 14:02:10 +02:00
res = ar.get(propagate=False, timeout=5)
self.result = str(res)
self.finished = True