We’re used to working with python so the examples I’ll use will be either pure python or python-based pseudocode. Python has a really simple and elegant syntax so it’s almost understandable for people who don’t know the language.
I’m going to explore a common problem in the backend of all kinds of applications that have heavy processing requirements, such as those that reprocess all links of each object from a class due to an update or recalculate an attribute that depends on huge functions, and walk through asynchronous solutions.
In this first entry I’ll model and explain the problem in detail and present a first solution. We’ll iterate this solution in future posts to make improvements.
Using this code example:
def foo_action(foo_obj): # ... # Do process # ... foo_obj.modify_state(to=state.COMPLETED) # ... # Do save # ... foo_obj.save() # ... # Do post-save incredibly heavy process # ... for bar_obj in bar.get_all_objects(): bar.recalculate_value_using_foo(foo_obj)
With this function we have two separate steps: firstly process and save the object, secondly a heavy post-save process. In the majority of cases, using signals and handlers will be an effective way to manage this situation but, if we’re working with legacy code and haven’t the opportunity to use this mechanism, we’ll be forced to work with polling. Polling is a mechanism that periodically asks if it is necessary to do the process and, if it is, does it.
In this case we have a light processing of the object (foo) that changes the state to completed and, after that, saves it. The problem is that we need to recalculate the value of the other type of object (bar) and we know that these calculations are really intensive. As it isn’t necessary to calculate these bar values immediately once the foo object is modified, we can move this process to an asynchronous task to avoid blocking foo objects.
Solution: first approach
We’ll start with a really naive approach where the periodic process is implemented in a script and scheduled using a tool like cron.
To improve this we’ll use Celery as our async tasks magic machine. Celery is a python library that provides a way to encapsulate functions as tasks, queue them and process them once a processing slot is available. We need to know six basics concepts:
- Task: Basic processing unit of celery. The function that will be executed.
- Queue: Where you will store your celery tasks waiting to be processed.
- Worker: Processes that will execute your tasks. It’s like another instance of your application dedicated to executing celery tasks.
- Broker: The manager that receives tasks, handles queues and sends tasks to workers.
- Periodic Task: Like normal tasks, but executed periodically using time intervals or crontabs.
- Celery beat: A special process of celery that manages Periodic Tasks, queuing them when the period is met.
Using Celery we can split the problem code as follows:
def foo_action(foo_obj): # ... # Do process # ... foo_obj.modify_state(to=state.COMPLETED) # ... # Do save # ... foo_obj.save() @periodic_task(crontab('*', '*', '*', '*', '*')) def foo_action_postsave(foo_obj): # ... # Do post-save incredibly heavy process # ... for bar_obj in bar.get_all_objects(): bar.recalculate_value_using_foo(foo_obj)
I simply extracted the post-save process into periodic tasks, which will be executed following the crontab “* * * * *” (every minute). This means that every minute our celery beat will queue a foo_action_postsave that will recalculate the value of each bar object.
But, as we’re working with a producer-consumer pattern, it’s natural to ask ourselves: What if our producer creates tasks faster than our consumer can execute them? The answer in this case is that eventually our celery queue will collapse.
I’ll explore how to navigate this issue in my next labs entry on the topic.
All code used as examples can be found in our own repository: