To improve the solution I explored in the previous entry, we need to go deeper into our knowledge of Celery.

Our goal is to change our tasks’ behavior so that, if the same type of task is currently being executed, the second task is marked as aborted (or similar) and not executed. To achieve this behavior we need to use Celery signals.

Celery provides some signals that will be launched when a determined action is performed. We’ll take advantage of the before_task_publish signal, which is called just before the task is published in our broker.

Solution: Third approach

I’ve defined the following function, which will be called on the signal:

def prevent_single_task_duplication(sender, body, **kwargs):
    # Get task class
    task = current_app.tasks.get(sender)

    logger = logging.getLogger('celery.tasks')

    # Check if is a Single task and revoke if is being executed
    if getattr(task, 'single_run', False) and hasattr(task, 'lock') and task.lock.locked():
        logger.info("%s > Task (%s) revoked due is currently being executed", task.TAG, body['id'])
        current_app.control.revoke(body['id'])

This function checks if the task that is going to be queued is a SingleTask (ie trying to get single_run attribute) and if the lock is locked. If the criteria aren’t met, the task is revoked before being queued.

For our monitoring purposes, these tasks will be tracked and we can see revoked tasks.

Through this third step, we’ve avoided having a ‘cleaner worker’ that constantly tries to execute tasks without success. And this concludes our posts on asynchronous solutions for applications with heavy processing requirements!

 

I hope you’ve enjoyed this series of posts, feel free to get in touch with any questions!

 

All code used as examples can be found in our own repository:
Code repository

Celery docs about signals can be found in:
Signals

Join the team changing the future of FinTech

Apply now!