None

Django application with Celery


Εliminate response times using Celery for asynchronous task queue in a Django web application.

By Kostas Koutsogiannopoulos

The concept

A user connects to our application, submitting a form with data. The application must process user' s data generating a result. Then must use the result to call another external application that will do another task.

In a classic approach, these time consumming operations would force the user to wait (and the application server's process to be busy) until the external application eventualy respond. This is not always the desired behavior.

We may want, after the initial user interaction, pass the upcoming tasks to a task queue for asyncronous execusion and free the user immediately.

Celery

Celery can implement asynchronous python task queues. You can create a task and use Celery for passing it to a queue and leave the queue to manage it then. Celery needs a message transport like RabbitMQ and Redis in order to send and receive messages. Check here for RabbitMQ and here for Redis installation guides and integration with Celery.

Choose Redis if you need speed or RabbitMQ if you need persistence for your message data. Both of them are feature complete and production ready.

We are assuming that Redis is already installed and active for our demonstration:

username@hostname:~$ redis-cli ping

PONG

Real-world example

We will create a Django application that is displaying a form. A user enters name, e-mail address, and a text message and clicks a submit button. Then the application creates an e-mail message with a subject and a body and uses our application's gmail account to send a mail to the user thanking him for his message and another mail to us informing for users's message.

For the gmail api call we will use the python program described in a previous article: 

Sending e-mails using gmail API

Celery installation

We assume that you already have a python virtual environment in the directory ~/virtual_env with a basic django application named "django_basic_app" inside a project named "django_project".

Activate your environment:

username@hostname:~$ source ~/virtual_env/bin/activate

Install Celery with Redis dependencies (inside your virtualenv):

(virtual_env) username@hostname:~$ pip install celery[redis]

Add the following Celery-related settings in your basic django application settings.py file:

# CELERY SETTINGS
BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Athens' # Replace with your timezone

Create a file named "celery.py" in your basic django application:

 ~/django_project/django_basic_app/celery.py
from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_basic_app.settings')

from django.conf import settings  # noqa

app = Celery('django_basic_app')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

 

Now create this file:

 ~/django_project/django_basic_app/__init__.py
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app  # noqa

 

After that we are ready to create a new application named "feedback" that meets our specifications.

New django application

(virtual_env) username@hostname:~$ cd django_project
(virtual_env) username@hostname:~$ python manage.py startapp feedback

Add the application in your settings.py file:

INSTALLED_APPS = (
# ...
'feedback',
)

Copy the python program we created in in the "feedback" application (gmailTest.py). We will use it as a library in our task so comment out the part with ArgumentParser:

try:
    import argparse
    flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args()
except ImportError:
    flags = None

 

Now we can create the asyncronous task. Create the following file:

 ~/django_project/feedback/tasks.py

from celery.decorators import task
from celery.utils.log import get_task_logger

from feedback.gmailTest import create_message, get_credentials, SendMessage
import httplib2
import os

from apiclient import discovery
import oauth2client
from oauth2client import client
from oauth2client import tools
import time
import base64

from email.mime.text import MIMEText
from apiclient import errors

logger = get_task_logger(__name__)

@task(name="call_gmail_api")
def auth_and_send(sender, recepient, subject, text_body):
    credentials = get_credentials()
    http = credentials.authorize(httplib2.Http())
    service = discovery.build('gmail', 'v1', http=http)
    testMessage = create_message( sender, recepient, subject, text_body )
    time.sleep(100)
    SendMessage( service, sender, testMessage )

 

Note the time.sleep() function that we use to demonstrate that task is running asyncronously.

Create the form:

 ~/django_project/feedback/forms.py
from django import forms
from message.tasks import auth_and_send

class FeedbackForm(forms.Form):
    name = forms.CharField(label='Full name*', max_length=100, required=True)
    email = forms.EmailField(label="email*", required=True)
    message = forms.CharField(
        label="Message*", widget=forms.Textarea(attrs={'rows': 3}))

    def send_email(self):
        body = ('Name: ' + self.cleaned_data['name'] + '\r\n' + \
               'e-mail: ' + self.cleaned_data['email'] + '\r\n' + 'Message: ' + self.cleaned_data['message'])
        auth_and_send.delay(
            'app_account@gmail.com', self.cleaned_data['email'], 'Company thanks for your message', body)
        auth_and_send.delay(
            'app_account@gmail.com', 'app_account@gmail.com', 'Message from Company', body)

 

The key point here is that we just added a .delay() function to our code without any other change in order to implement the asyncronous execution.

Create the view:

 ~/django_project/feedback/views.py
from django.shortcuts import render

from django.views.generic.edit import FormView
from message.forms import FeedbackForm


class FeedbackView(FormView):
    template_name = 'feedback.html'
    form_class = FeedbackForm
    success_url = '.'

    def form_valid(self, form):
        form.send_email()
        return super(FeedbackView, self).form_valid(form)

 

...the template you point in your view:

 ~/django_project/feedback/templates/feedback.html

<div class="row">
  <div class="col s12">
    <h3>Feedback</h3>
    <form action="" method="post">{% csrf_token %}
      {{ form.as_p }}
      <button class="btn waves-effect waves-light" type="submit" name="action">Submit
        <i class="mdi-content-send right"></i>
      </button>
    </form>
  </div>
</div>

 

...and expose the view in your urls.py:

 ~/django_project/django_basic_app/urls.py
from django.conf.urls import url
from feedback.views import FeedbackView

urlpatterns = [
    url(r'^feedback/$', FeedbackView.as_view(), name='feedback'),
# ...
]

 

Now hit your application at https://www.company.com/feedback/

Before you submit the form, dont forget to start the celery worker with this command:

(virtual_env) username@hostname:~$ celery worker -A django_basic_app --loglevel=INFO

  -------------- celery@hostname v3.1.23 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.13.0-92-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         feedback:0x7f95a3a65d30
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
                
[tasks]
  . call_gmail_api

[2016-07-22 18:15:51,457: INFO/MainProcess] Connected to redis://localhost:6379/0
[2016-07-22 18:15:51,464: INFO/MainProcess] mingle: searching for neighbors
[2016-07-22 18:15:52,468: INFO/MainProcess] mingle: all alone
[2016-07-22 18:15:52,474: WARNING/MainProcess] celery@hostname ready.

 

Submit the form and watch your tasks running asyncronously. They will complete in about 100 seconds due to the time.sleep() function:

[2016-07-22 18:18:10,468: INFO/MainProcess] Received task: call_gmail_api[b4920786-50ab-40d4-8d1a-3a398c47781e]
[2016-07-22 18:18:10,481: INFO/MainProcess] Received task: call_gmail_api[e8152691-60f9-4e55-baaa-d8417b25e2dd]
[2016-07-22 18:19:50,600: INFO/Worker-1] URL being requested: POST https://www.googleapis.com/gmail/v1/users/app_account%40gmail.com/messages/send?alt=json
[2016-07-22 18:19:51,159: WARNING/Worker-1] Message Id: 1561330fe127292b
[2016-07-22 18:19:51,160: INFO/MainProcess] Task call_gmail_api[b4920786-50ab-40d4-8d1a-3a398c47781e] succeeded in 100.68995190699934s: None
[2016-07-22 18:21:31,257: INFO/Worker-1] URL being requested: POST https://www.googleapis.com/gmail/v1/users/app_account%40gmail.com/messages/send?alt=json
[2016-07-22 18:21:31,575: WARNING/Worker-1] Message Id: 156133286f32428f
[2016-07-22 18:21:31,576: INFO/MainProcess] Task call_gmail_api[e8152691-60f9-4e55-baaa-d8417b25e2dd] succeeded in 100.4155570810035s: None

Monitoring

Usefull commands to monitor celery and tasks are:

For a simple curses monitor:

(virtual_env) username@hostname:~/django_project$ celery -A django_basic_app control enable_events
(virtual_env) username@hostname:~/django_project$ celery -A django_basic_app events

 

For a JSON with full statistics:

(virtual_env) username@hostname:~/django_project$ celery -A django_basic_app inspect stats

Also check the documentation for full options about Celery monitoring (console, Flower for web monitoring etc.)

 


View epilis's profile on LinkedIn Visit us on facebook X epilis rss feed: Latest articles