docs
  • Overview
  • 🐍 PYTHON
    • Type Hints
    • PEP8 Style Guide for Python Code
    • 🏡Pipenv
    • Pathlib
  • 🕸Django
    • 🗄models
      • 🎯Best Practices
      • 🚦Django Signals
    • ⚙️ settings
    • DRF
      • Serializer
      • Authentication
      • Permissions
      • Viewsets
    • Testing
      • Faker and Factory Boy
    • 🧪Test Coverage
    • 💦Python-Decouple
    • Django Tips:
    • 💾Django ORM Queryset
    • Custom Exceptions
    • Celery
    • Resources
  • Deploy
    • 🚀Django Deployment
    • 🔒Setup SSL Certificate
  • 💾Database
    • MongoDB
  • 🛠️DevOps
    • 🖥Scripting
      • A First Script
      • Loops
      • Test
      • Variables
      • External programs
      • Functions
    • Command Line Shortcuts
    • Basic Linux Commands
    • 🎛Microservices
    • 🐳Docker
      • Docker Commands
      • Docker Compose
      • Django project
    • Kubernates
  • 📝Software IDE
    • EditorConfig
    • Linters
    • VsCode
Powered by GitBook
On this page
  • Celery in Django Project
  • Message broker
  • Result backend
  • Database Transactions
  • Database Transactions in Dajngo
  • DoesNotExist Exception

Was this helpful?

  1. 🕸Django

Celery

Working with celery in Django application

Celery in Django Project

Celery can help run tasks on worker process instead of web process, so in web process we can return HTTP response back immediately (even the task in worker process is still running) to our user, the request cycle would not be blocked and the user experience would be better.

When you build web application, you should try to make the response time of your web application lower than 500ms, if some response time is big, you should figure out the reason and try to solve it. Celery can help when solving this problem.

Message broker

Message broker is the store which interacts as transport between producer and consumer.

From Celery doc, RabbitMQ is top recommended for message broker because it supporst AMQP (Advanced Message Queuing Protocol)

But in many cases, we do not have to use features of AMQP so other message broker such as Redis is also ok.

Result backend

Result backend is the store which save the result and error info or Celery task.

Redis is recommended here.

Database Transactions

How to prevent a Celery task dependent on a Django database transaction from executing before the database commits the transaction.

A database transaction is a unit of work that is either committed (applied to the database) or rolled back (undone from the database) as a unit.

Most databases use the following pattern:

  1. Begin the transaction.

  2. Execute a set of data manipulations and/or queries.

  3. If no error occurs, then commit the transaction.

  4. If an error occurs, then roll back the transaction.

Database Transactions in Dajngo

Django default behavior is to autocommit: Each query is directly committed to the database unless a transaction is active. In other words, with autocommit, each query starts a transaction and either commits or rolls back the transaction as well. If you have a view with three queries, then each will run one-by-one. If one fails, the other two will be committed.

def transaction_test(request):
    with transaction.atomic():
        user = User.objects.create_user('santosh', 'santosh@company.com')
        logger.info(f'create user {user.pk}')
        raise Exception('force transaction to rollback')

DoesNotExist Exception

@transaction.atomic
def transaction_celery(request):
    username = random_username()
    user = User.objects.create_user(username, 'lennon@thebeatles.com', 'johnpassword')
    logger.info(f'create user {user.pk}')
    task_send_welcome_email.delay(user.pk)

    time.sleep(1)
    return HttpResponse('test')

        
# The task code

@shared_task()
def task_send_welcome_email(user_pk):
    user = User.objects.get(pk=user_pk)
    logger.info(f'send email to {user.email} {user.pk}')
  1. Since the view uses the transaction.atomic decorator, all database operations are only committed if an error isn't raised in the view, including the Celery task.

  2. The task is fairly simple: We create a user and then pass the primary key to the task to send a welcome email.

  3. time.sleep(1) is used to introduce a race condition.

When run, you will see the following error:

django.contrib.auth.models.User.DoesNotExist: User matching query does not exist.

Why?

  1. We pause for 1 second after enqueueing the task.

  2. Since the task executes immediately, user = User.objects.get(pk=user_pk) fails as the user is not in the database because the transaction in Django has not yet been committed.

Solution

There are three ways to solve this:

  1. Disable the database transaction, so Django would use the autocommit feature. To do so, you can simply remove the transaction.atomic decorator. However, this isn't recommended since the atomic database transaction is a powerful tool.

  2. Force the Celery task to run after a period of time. For example, to pause for 10 seconds:

    task_send_welcome_email.apply_async(args=[user.pk], countdown=10)
  3. Django has a callback function called transaction.on_commit that executes after a transaction successfully commits. To use this, update the view like so:

    @transaction.atomic
    def transaction_celery2(request):
        username = random_username()
        user = User.objects.create_user(username, 'lennon@thebeatles.com', 'johnpassword')
        logger.info(f'create user {user.pk}')
        # the task does not get called until after the transaction is committed
        transaction.on_commit(lambda: task_send_welcome_email.delay(user.pk))
    
        time.sleep(1)
        return HttpResponse('test')

Now, the task doesn't get called until after the database transaction commit. So, when the Celery worker finds the user, it can be found because the code in the worker always runs after the Django database transaction commits successfully.

It's worth noting that you may not want your transaction to commit right away, especially if you're running in a high-scale environment. If either the database or instance are at high-utilization, forcing a commit will only add to the existing usage. In this case, you may want to use the second solution and wait for a sufficient amount of time (20 seconds, perhaps) to ensure that the changes are made to the database before the task executes.

PreviousCustom ExceptionsNextResources

Last updated 3 years ago

Was this helpful?