from .tasks.optimize import optimize from .functions import * import json from celery.contrib.abortable import AbortableTask from leagues.celery import celery, TASK_TIME_LIMIT from leagues.settings import RUN_ENV, SOLVER from scheduler.models import * @celery.task(name='test_task', bind=True, time_limit=TASK_TIME_LIMIT, base=AbortableTask) def test_task(self): status_dict = { c: {} for c in range(10) } i = 0 c = 0 terminate = False self.update_state(state='STARTED', meta={'info':c}) while(c < 10): i += 1 status_dict[c][i] = "".join(random.choices(["A","B","C","D","E","F","G","H","I","J"], k=100)) status_dict['info'] = "RUNNING" self.update_state(state='TESTING', meta=json.dumps(status_dict)) if i == 100000: c += 1 i = 0 print("TESTING",c) if terminate: return "TEST ABORTED" if self.is_aborted(): terminate = True return "TEST SUCCESSFULL"