gva/gnuviechadmin/taskresults/models.py
Jan Dittberner 2d05580ed3 Improve taskresults app
- show created/modified fields in admin
- add verbose output to management command
- add date hierarchy and action filters in admin
2023-04-29 11:13:38 +02:00

54 lines
1.6 KiB
Python

"""
This model defines the database models to handle Celery AsyncResults.
"""
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext as _
from model_utils.models import TimeStampedModel
from gnuviechadmin.celery import app
class TaskResultManager(models.Manager):
def create_task_result(self, creator, signature, notes=""):
sigstr = str(signature)
result = signature.apply_async()
taskresult = self.create(
task_id=result.task_id, creator=creator, signature=sigstr, notes=notes
)
return taskresult
class TaskResult(TimeStampedModel):
task_id = models.CharField(_("Task id"), max_length=36)
signature = models.TextField(_("Task signature"))
creator = models.TextField(_("Task creator"))
notes = models.TextField(_("Task notes"))
result = models.TextField(_("Task result"))
finished = models.BooleanField(default=False)
state = models.CharField(_("Task state"), max_length=16)
objects = TaskResultManager()
class Meta:
verbose_name = _("Task result")
verbose_name_plural = _("Task results")
ordering = ["-modified"]
def __str__(self):
return "{creator} ({task_id}): {finished}".format(
creator=self.creator,
task_id=self.task_id,
finished=_("yes") if self.finished else _("no"),
)
def fetch_result(self):
if not self.finished:
ar = app.AsyncResult(self.task_id)
self.state = ar.state
if ar.ready():
res = ar.get()
self.result = str(res)
self.finished = True