Celery
Working with celery in Django application
Celery in Django Project
Celery can help run tasks on worker process
instead of web process
, so in web process
we can return HTTP response back immediately (even the task in worker process
is still running) to our user, the request cycle would not be blocked and the user experience would be better.
When you build web application, you should try to make the response time of your web application lower than 500ms, if some response time is big, you should figure out the reason and try to solve it. Celery can help when solving this problem.
Message broker
Message broker is the store which interacts as transport between producer and consumer.
From Celery doc, RabbitMQ
is top recommended for message broker
because it supporst AMQP
(Advanced Message Queuing Protocol)
But in many cases, we do not have to use features of AMQP
so other message broker
such as Redis
is also ok.
Result backend
Result backend is the store which save the result and error info or Celery task.
Redis
is recommended here.
Database Transactions
How to prevent a Celery task dependent on a Django database transaction from executing before the database commits the transaction.
A database transaction is a unit of work that is either committed (applied to the database) or rolled back (undone from the database) as a unit.
Most databases use the following pattern:
Begin the transaction.
Execute a set of data manipulations and/or queries.
If no error occurs, then commit the transaction.
If an error occurs, then roll back the transaction.
Database Transactions in Dajngo
Django default behavior is to autocommit: Each query is directly committed to the database unless a transaction is active. In other words, with autocommit, each query starts a transaction and either commits or rolls back the transaction as well. If you have a view with three queries, then each will run one-by-one. If one fails, the other two will be committed.
def transaction_test(request):
with transaction.atomic():
user = User.objects.create_user('santosh', 'santosh@company.com')
logger.info(f'create user {user.pk}')
raise Exception('force transaction to rollback')
DoesNotExist Exception
@transaction.atomic
def transaction_celery(request):
username = random_username()
user = User.objects.create_user(username, 'lennon@thebeatles.com', 'johnpassword')
logger.info(f'create user {user.pk}')
task_send_welcome_email.delay(user.pk)
time.sleep(1)
return HttpResponse('test')
# The task code
@shared_task()
def task_send_welcome_email(user_pk):
user = User.objects.get(pk=user_pk)
logger.info(f'send email to {user.email} {user.pk}')
Since the view uses the
transaction.atomic
decorator, all database operations are only committed if an error isn't raised in the view, including the Celery task.The task is fairly simple: We create a user and then pass the primary key to the task to send a welcome email.
time.sleep(1)
is used to introduce a race condition.
When run, you will see the following error:
django.contrib.auth.models.User.DoesNotExist: User matching query does not exist.
Why?
We pause for 1 second after enqueueing the task.
Since the task executes immediately,
user = User.objects.get(pk=user_pk)
fails as the user is not in the database because the transaction in Django has not yet been committed.
Solution
There are three ways to solve this:
Disable the database transaction, so Django would use the
autocommit
feature. To do so, you can simply remove thetransaction.atomic
decorator. However, this isn't recommended since the atomic database transaction is a powerful tool.Force the Celery task to run after a period of time. For example, to pause for 10 seconds:
task_send_welcome_email.apply_async(args=[user.pk], countdown=10)
Django has a callback function called
transaction.on_commit
that executes after a transaction successfully commits. To use this, update the view like so:@transaction.atomic def transaction_celery2(request): username = random_username() user = User.objects.create_user(username, 'lennon@thebeatles.com', 'johnpassword') logger.info(f'create user {user.pk}') # the task does not get called until after the transaction is committed transaction.on_commit(lambda: task_send_welcome_email.delay(user.pk)) time.sleep(1) return HttpResponse('test')
Now, the task doesn't get called until after the database transaction commit. So, when the Celery worker finds the user, it can be found because the code in the worker always runs after the Django database transaction commits successfully.
It's worth noting that you may not want your transaction to commit right away, especially if you're running in a high-scale environment. If either the database or instance are at high-utilization, forcing a commit will only add to the existing usage. In this case, you may want to use the second solution and wait for a sufficient amount of time (20 seconds, perhaps) to ensure that the changes are made to the database before the task executes.
Last updated
Was this helpful?