Development

Daemons with Celery II

April 27, 2016

April 27, 2016 by José Antonio Perdiguero

Continuing with our previous entry about how to daemonize with Celery. We left the solution in a state where our buffer could eventually collapse due to our producer generating tasks faster that our consumer could execute them.

To solve this I propose another question: “Is foo_action_postsave task currently being executed?”

If we enable our producer to ask this question we can avoid queuing a new task until the current one has finished. In order to know if a task is being executed we can use a cache-based lock with the following behavior: when a task is going to be executed, acquire the lock, do the process and then, just after it finishes, release the lock.

Solution: Second approach

An example of a cache-based lock (this works really well with Django caches backend):

class CacheLock(object):
    """
    A lock implementation.

    A lock manages an internal value that is 0 or nonexistent when lock is free and 1 when is closed. 
    Can be locked calling acquire() and freed calling release().
    """
    def __init__(self, cache_key: str, timeout: int=None):
        """
        Create a Lock using Django cache as backend.

        :param cache_key: Key that will be used in cache to store the lock.
        :param timeout: Time to expire.
        """
        self._cache_key = cache_key
        self._timeout = timeout

    def acquire(self):
        """
        Acquire the lock, blocking follow calls.

        :return: True if lock have been acquired, otherwise False.
        """
        if not self.locked():
            cache.add(self._cache_key, 1, self._timeout)
            result = True
        else:
            result = False
        return result

    def release(self):
        cache.delete(self._cache_key)

    def locked(self):
        return cache.get(self._cache_key, 0) == 1

    def __del__(self):
        return self.release()

Using this lock implementation we can define a base class for Celery tasks that behave how we described:

class SingleTask(Task):
    TAG = 'to be defined, could be __name__'
    abstract = True
    single_run = True

    def __init__(self, *args, **kwargs):
        super(LoggedSingleTask, self).__init__(*args, **kwargs)
        lock_id = 'lock_{}'.format(self.TAG.lower())
        self.lock = CacheLock(cache_key=lock_id)

    def __call__(self, *args, **kwargs):
        if self.lock.acquire():
            result = super(LoggedSingleTask, self).__call__(*args, **kwargs)
            self.lock.release()
            return result
        else:
            return False

    def on_success(self, retval, task_id, args, kwargs):
        self.lock.release()
        super(LoggedSingleTask, self).on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        self.lock.release()
        super(LoggedSingleTask, self).on_failure(exc, task_id, args, kwargs, einfo)

We need to override four methods from Celery base tasks:

  • init: When Celery creates the task, we assign a lock to this task using a unique tag.
  • call: When the task is called for execution we try to acquire the lock. If acquired, the task is executed and afterwards the lock is released. If not acquired, the task is ignored.
  • on_success: When the task finishes, release the lock.
  • on_failure: When the task fails, release the lock.

This is all we need to do in order avoid collapses in our queues.

But is important to make sure that our workers can operate two tasks concurrently and without prefetch. We need a worker that tries to execute a task even if another task is currently being executed in order to make sure tasks are cleaned from the queue (try to execute, fails acquiring lock and immediately exits).

I agree this isn’t the most elegant approach but, in the third entry of this series, I’ll explain how to avoid having a ‘cleaner worker’ that constantly tries to execute tasks without success.

All code used as examples can be found in our own repository:
Code repository

Celery docs for prefetching and concurrency behavior can be found in:
Concurrency
Prefetch

Share on Share on FacebookGoogle+Tweet about this on TwitterShare on LinkedIn

Your email address will not be published. Required fields are marked *

2 thoughts on “Daemons with Celery II”

Rubén

Hi José Antonio,
Great article series, thank you for sharing. Just a point and a question:
In CacheLock code there are 25 to 29 contains a wrong indented code. In addition, could be easier to read something like:

if not self.locked():
cache.add(self._cache_key, 1, self._timeout)
return True
return False

And the question is: If foo_action is not called more than once per second, why “cronize” the task? On previous post you said it was part of a process. Why not hook the task call to this process instead?

    Miguel Barrientos

    Hi Rubén,

    Thanks for pointing out the indentation mistake! It should be fixed now.

    Regarding having the “heavy-process” task as a periodic task, it’s just a matter of separation of concerns. In this case having a post_save processing could be called directly from “foo_action”, as you point out, but that might not be always the case: I’m thinking on a “cleaning-like” task, where foo_action shouldn’t know about its existence, and periodic tasking results very handy here.