Continuing with our previous entry about how to daemonize with Celery. We left the solution in a state where our buffer could eventually collapse due to our producer generating tasks faster that our consumer could execute them.

To solve this I propose another question: “Is foo_action_postsave task currently being executed?”

If we enable our producer to ask this question we can avoid queuing a new task until the current one has finished. In order to know if a task is being executed we can use a cache-based lock with the following behavior: when a task is going to be executed, acquire the lock, do the process and then, just after it finishes, release the lock.

Solution: Second approach

An example of a cache-based lock (this works really well with Django caches backend):

class CacheLock(object):
    A lock implementation.

    A lock manages an internal value that is 0 or nonexistent when lock is free and 1 when is closed. 
    Can be locked calling acquire() and freed calling release().
    def __init__(self, cache_key: str, timeout: int=None):
        Create a Lock using Django cache as backend.

        :param cache_key: Key that will be used in cache to store the lock.
        :param timeout: Time to expire.
        self._cache_key = cache_key
        self._timeout = timeout

    def acquire(self):
        Acquire the lock, blocking follow calls.

        :return: True if lock have been acquired, otherwise False.
        if not self.locked():
            cache.add(self._cache_key, 1, self._timeout)
            result = True
            result = False
        return result

    def release(self):

    def locked(self):
        return cache.get(self._cache_key, 0) == 1

    def __del__(self):
        return self.release()

Using this lock implementation we can define a base class for Celery tasks that behave how we described:

class SingleTask(Task):
    TAG = 'to be defined, could be __name__'
    abstract = True
    single_run = True

    def __init__(self, *args, **kwargs):
        super(LoggedSingleTask, self).__init__(*args, **kwargs)
        lock_id = 'lock_{}'.format(self.TAG.lower())
        self.lock = CacheLock(cache_key=lock_id)

    def __call__(self, *args, **kwargs):
        if self.lock.acquire():
            result = super(LoggedSingleTask, self).__call__(*args, **kwargs)
            return result
            return False

    def on_success(self, retval, task_id, args, kwargs):
        super(LoggedSingleTask, self).on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        super(LoggedSingleTask, self).on_failure(exc, task_id, args, kwargs, einfo)

We need to override four methods from Celery base tasks:

  • init: When Celery creates the task, we assign a lock to this task using a unique tag.
  • call: When the task is called for execution we try to acquire the lock. If acquired, the task is executed and afterwards the lock is released. If not acquired, the task is ignored.
  • on_success: When the task finishes, release the lock.
  • on_failure: When the task fails, release the lock.

This is all we need to do in order avoid collapses in our queues.

But is important to make sure that our workers can operate two tasks concurrently and without prefetch. We need a worker that tries to execute a task even if another task is currently being executed in order to make sure tasks are cleaned from the queue (try to execute, fails acquiring lock and immediately exits).

I agree this isn’t the most elegant approach but, in the third entry of this series, I’ll explain how to avoid having a ‘cleaner worker’ that constantly tries to execute tasks without success.

All code used as examples can be found in our own repository:
Code repository

Celery docs for prefetching and concurrency behavior can be found in:

Join the team changing the future of FinTech

Apply now!