Web applications usually start out simple but can become quite complex, and most of them quickly exceed the responsibility of only responding to HTTP requests.
When that happens, one must make a distinction between what has to happen instantly (usually in the HTTP request lifecycle) and what can happen eventually. Why is that? Well, because when your application becomes overloaded with traffic, simple things like this make the difference.
Operations in a web application can be classified as critical or request-time operations and background tasks, the ones that happen outside request time. These map to the ones described above:
- needs to happen instantly: request-time operations
- needs to happen eventually: background tasks
Request-time operations can be done on a single request/response cycle without worrying that the operation will time out or that the user might have a bad experience. Common examples include CRUD (Create, Read, Update, Delete) database operations and user management (Login/Logout routines).
Background tasks are different as they are usually quite time-consuming and are prone to failure, mostly due to external dependencies. Some common scenarios among complex web applications include:
- sending confirmation or activity emails
- daily crawling and scraping some information from various sources and storing them
- performing data analysis
- deleting unneeded resources
- exporting documents/photos in various formats
Background tasks are the main focus of this tutorial. The most common programming pattern used for this scenario is the Producer Consumer Architecture.
In simple terms, this architecture can be described like this:
- Producers create data or tasks.
- Tasks are put into a queue that is referred to as the task queue.
- Consumers are responsible for consuming the data or running the tasks.
Usually, the consumers retrieve tasks from the queue in a first-in-first-out (FIFO) fashion or according to their priorities. The consumers are also referred to as workers, and that is the term we will be using throughout, as it is consistent with the terminology used by the technologies discussed.
What kind of tasks can be processed in the background? Tasks that:
- are not essential for the basic functionality of the web application
- can’t be run in the request/response cycle since they are slow (I/O intensive, etc.)
- depend on external resources that might not be available or not behave as expected
- might need to be retried at least once
- have to be executed on a schedule
Celery is the de facto choice for doing background task processing in the Python/Django ecosystem. It has a simple and clear API, and it integrates beautifully with Django. It supports various technologies for the task queue and various paradigms for the workers.
In this tutorial, we’re going to create a Django toy web application (dealing with real-world scenarios) that uses background task processing.
Setting Things Up
Assuming you are already familiar with Python package management and virtual environments, let’s install Django:
$ pip install Django
I’ve decided to build yet another blogging application. The focus of the application will be on simplicity. A user can simply create an account and without too much fuss can create a post and publish it to the platform.
Set up the quick_publisher
Django project:
$ django-admin startproject quick_publisher
Let’s get the app started:
$ cd quick_publisher $ ./manage.py startapp main
When starting a new Django project, I like to create a main
application that contains, among other things, a custom user model. More often than not, I encounter limitations of the default Django User
model. Having a custom User
model gives us the benefit of flexibility.
# main/models.py from django.db import models from django.contrib.auth.models import AbstractBaseUser, PermissionsMixin, BaseUserManager class UserAccountManager(BaseUserManager): use_in_migrations = True def _create_user(self, email, password, **extra_fields): if not email: raise ValueError('Email address must be provided') if not password: raise ValueError('Password must be provided') email = self.normalize_email(email) user = self.model(email=email, **extra_fields) user.set_password(password) user.save(using=self._db) return user def create_user(self, email=None, password=None, **extra_fields): return self._create_user(email, password, **extra_fields) def create_superuser(self, email, password, **extra_fields): extra_fields['is_staff'] = True extra_fields['is_superuser'] = True return self._create_user(email, password, **extra_fields) class User(AbstractBaseUser, PermissionsMixin): REQUIRED_FIELDS = [] USERNAME_FIELD = 'email' objects = UserAccountManager() email = models.EmailField('email', unique=True, blank=False, null=False) full_name = models.CharField('full name', blank=True, null=True, max_length=400) is_staff = models.BooleanField('staff status', default=False) is_active = models.BooleanField('active', default=True) def get_short_name(self): return self.email def get_full_name(self): return self.email def __unicode__(self): return self.email
Make sure to check out the Django documentation if you are not familiar with how custom user models work.
Now we need to tell Django to use this User model instead of the default one. Add this line to the quick_publisher/settings.py
file:
AUTH_USER_MODEL = 'main.User'
We also need to add the main
application to the INSTALLED_APPS
list in the quick_publisher/settings.py
file. We can now create the migrations, apply them, and create a superuser to be able to log in to the Django admin panel:
$ ./manage.py makemigrations main $ ./manage.py migrate $ ./manage.py createsuperuser
Let’s now create a separate Django application that’s responsible for posts:
$ ./manage.py startapp publish
Let’s define a simple Post model in publisher/models.py
:
from django.db import models from django.utils import timezone from django.contrib.auth import get_user_model class Post(models.Model): author = models.ForeignKey(get_user_model()) created = models.DateTimeField('Created Date', default=timezone.now) title = models.CharField('Title', max_length=200) content = models.TextField('Content') slug = models.SlugField('Slug') def __str__(self): return '"%s" by %s' % (self.title, self.author)
Hooking the Post
model with the Django admin is done in the publisher/admin.py
file like this:
from django.contrib import admin from .models import Post @admin.register(Post) class PostAdmin(admin.ModelAdmin): pass
Finally, let’s hook the publisher
application with our project by adding it to the INSTALLED_APPS
list.
We can now run the server and head over to http://localhost:8000/admin/
and create our first posts so that we have something to play with:
$ ./manage.py runserver
I trust you’ve done your homework and you’ve created the posts.
Let’s move on. The next obvious step is to create a way to view the published posts.
# publisher/views.py from django.http import Http404 from django.shortcuts import render from .models import Post def view_post(request, slug): try: post = Post.objects.get(slug=slug) except Post.DoesNotExist: raise Http404("Poll does not exist") return render(request, 'post.html', context={'post': post})
Let’s associate our new view with an URL in: quick_publisher/urls.py
# quick_publisher/urls.py from django.conf.urls import url from django.contrib import admin from publisher.views import view_post urlpatterns = [ url(r'^admin/', admin.site.urls), url(r'^(?P[a-zA-Z0-9-]+)', view_post, name='view_post') ]
Finally, let’s create the template that renders the post in: publisher/templates/post.html
{{ post.title }}
{{ post.content }}
Published by {{ post.author.full_name }} on {{ post.created }}
We can now head to http://localhost:8000/the-slug-of-the-post-you-created/ in the browser. It’s not exactly a miracle of web design, but making good-looking posts is beyond the scope of this tutorial.
Sending Confirmation Emails
Here’s the classic scenario:
- You create an account on a platform.
- You provide an email address to be uniquely identified on the platform.
- The platform checks you are indeed the owner of the email address by sending an email with a confirmation link.
- Until you perform the verification, you are not able to (fully) use the platform.
Let’s add an is_verified
flag and the verification_uuid
on the User
model:
# main/models.py import uuid class User(AbstractBaseUser, PermissionsMixin): REQUIRED_FIELDS = [] USERNAME_FIELD = 'email' objects = UserAccountManager() email = models.EmailField('email', unique=True, blank=False, null=False) full_name = models.CharField('full name', blank=True, null=True, max_length=400) is_staff = models.BooleanField('staff status', default=False) is_active = models.BooleanField('active', default=True) is_verified = models.BooleanField('verified', default=False) # Add the `is_verified` flag verification_uuid = models.UUIDField('Unique Verification UUID', default=uuid.uuid4) def get_short_name(self): return self.email def get_full_name(self): return self.email def __unicode__(self): return self.email
Let’s use this occasion to add the User model to the admin:
from django.contrib import admin from .models import User @admin.register(User) class UserAdmin(admin.ModelAdmin): pass
Let’s make the changes reflect in the database:
$ ./manage.py makemigrations $ ./manage.py migrate
We now need to write a piece of code that sends an email when a user instance is created. This is what Django signals are for, and this is a perfect occasion to touch this subject.
Signals are fired before/after certain events occur in the application. We can define callback functions that are triggered automatically when the signals are fired. To make a callback trigger, we must first connect it to a signal.
We’re going to create a callback that will be triggered after a User model has been created. We’ll add this code after the User
model definition in: main/models.py
from django.db.models import signals from django.core.mail import send_mail def user_post_save(sender, instance, signal, *args, **kwargs): if not instance.is_verified: # Send verification email send_mail( 'Verify your QuickPublisher account', 'Follow this link to verify your account: ' 'http://localhost:8000%s' % reverse('verify', kwargs={'uuid': str(instance.verification_uuid)}), '[email protected]', [instance.email], fail_silently=False, ) signals.post_save.connect(user_post_save, sender=User)
What we’ve done here is we’ve defined a user_post_save
function and connected it to the post_save
signal (one that is triggered after a model has been saved) sent by the User
model.
Django doesn’t just send emails out on its own; it needs to be tied to an email service. For the sake of simplicity, you can add your Gmail credentials in quick_publisher/settings.py
, or you can add your favourite email provider.
Here’s what Gmail configuration looks like:
EMAIL_USE_TLS = True EMAIL_HOST = 'smtp.gmail.com' EMAIL_HOST_USER = '@gmail.com' EMAIL_HOST_PASSWORD = ' ' EMAIL_PORT = 587
To test things out, go into the admin panel and create a new user with a valid email address you can quickly check. If all went well, you’ll receive an email with a verification link. The verification routine is not ready yet.
Here’s how to verify the account:
# main/views.py from django.http import Http404 from django.shortcuts import render, redirect from .models import User def home(request): return render(request, 'home.html') def verify(request, uuid): try: user = User.objects.get(verification_uuid=uuid, is_verified=False) except User.DoesNotExist: raise Http404("User does not exist or is already verified") user.is_verified = True user.save() return redirect('home')
Hook the views up in: quick_publisher/urls.py
# quick_publisher/urls.py from django.conf.urls import url from django.contrib import admin from publisher.views import view_post from main.views import home, verify urlpatterns = [ url(r'^$', home, name='home'), url(r'^admin/', admin.site.urls), url(r'^verify/(?P[a-z0-9-]+)/', verify, name='verify'), url(r'^(?P [a-zA-Z0-9-]+)', view_post, name='view_post') ]
Also, remember to create a home.html
file under main/templates/home.html
. It will be rendered by the home
view.
Try to run the entire scenario all over again. If all is well, you’ll receive an email with a valid verification URL. If you’ll follow the URL and then check in the admin, you can see how the account has been verified.
Sending Emails Asynchronously
Here’s the problem with what we’ve done so far. You might have noticed that creating a user is a bit slow. That’s because Django sends the verification email inside the request time.
This is how it works: we send the user data to the Django application. The application creates a User
model and then creates a connection to Gmail (or another service you selected). Django waits for the response, and only then does it return a response to our browser.
Here is where Celery comes in. First, make sure it is installed:
$ pip install Celery
We now need to create a Celery application in our Django application:
# quick_publisher/celery.py import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'quick_publisher.settings') app = Celery('quick_publisher') app.config_from_object('django.conf:settings') # Load task modules from all registered Django app configs. app.autodiscover_tasks()
Celery is a task queue. It receives tasks from our Django application, and it will run them in the background. Celery needs to be paired with other services that act as brokers.
Brokers intermediate the sending of messages between the web application and Celery. In this tutorial, we’ll be using Redis. Redis is easy to install, and we can easily get started with it without too much fuss.
You can install Redis by following the instructions on the Redis Quick Start page. You’ll need to install the Redis Python library, pip install redis
, and the bundle necessary for using Redis and Celery: pip install celery[redis]
.
Start the Redis server in a separate console like this: $ redis-server
Let’s add the Celery/Redis related configs into quick_publisher/settings.py
:
# REDIS related settings REDIS_HOST = 'localhost' REDIS_PORT = '6379' BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0' BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600} CELERY_RESULT_BACKEND = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'
Before anything can be run in Celery, it must be declared as a task.
Here’s how to do this:
# main/tasks.py import logging from django.urls import reverse from django.core.mail import send_mail from django.contrib.auth import get_user_model from quick_publisher.celery import app @app.task def send_verification_email(user_id): UserModel = get_user_model() try: user = UserModel.objects.get(pk=user_id) send_mail( 'Verify your QuickPublisher account', 'Follow this link to verify your account: ' 'http://localhost:8000%s' % reverse('verify', kwargs={'uuid': str(user.verification_uuid)}), '[email protected]', [user.email], fail_silently=False, ) except UserModel.DoesNotExist: logging.warning("Tried to send verification email to non-existing user '%s'" % user_id)
What we’ve done here is this: we moved the sending verification email functionality in another file called tasks.py
.
A few notes:
- The name of the file is important. Celery goes through all the apps in
INSTALLED_APPS
and registers the tasks intasks.py
files. - Notice how we decorated the
send_verification_email
function with@app.task
. This tells Celery this is a task that will be run in the task queue. - Notice how we expect as argument
user_id
rather than aUser
object. This is because we might have trouble serializing complex objects when sending the tasks to Celery. It’s best to keep them simple.
Going back to main/models.py
, the signal code turns into:
from django.db.models import signals from main.tasks import send_verification_email def user_post_save(sender, instance, signal, *args, **kwargs): if not instance.is_verified: # Send verification email send_verification_email.delay(instance.pk) signals.post_save.connect(user_post_save, sender=User)
Notice how we call the .delay
method on the task object. This means we’re sending the task off to Celery and we don’t wait for the result. If we used send_verification_email(instance.pk)
instead, we would still be sending it to Celery, but would be waiting for the task to finish, which is not what we want.
Before you start creating a new user, there’s a catch. Celery is a service, and we need to start it. Open a new console, make sure you activate the appropriate virtualenv, and navigate to the project folder.
$ celery worker -A quick_publisher --loglevel=debug --concurrency=4
This starts four Celery process workers. Yes, now you can finally go and create another user. Notice how there’s no delay, and make sure to watch the logs in the Celery console and see if the tasks are properly executed. This should look something like this:
[2017-04-28 15:00:09,190: DEBUG/MainProcess] Task accepted: main.tasks.send_verification_email[f1f41e1f-ca39-43d2-a37d-9de085dc99de] pid:62065 [2017-04-28 15:00:11,740: INFO/PoolWorker-2] Task main.tasks.send_verification_email[f1f41e1f-ca39-43d2-a37d-9de085dc99de] succeeded in 2.5500912349671125s: None
Periodic Tasks With Celery
Here’s another common scenario. Most mature web applications send their users lifecycle emails in order to keep them engaged. Some common examples of lifecycle emails:
- monthly reports
- activity notifications (likes, friendship requests, etc.)
- reminders to accomplish certain actions (“Don’t forget to activate your account”)
Here’s what we’re going to do in our app. We’re going to count how many times every post has been viewed and send a daily report to the author. Once every single day, we’re going to go through all the users, fetch their posts, and send an email with a table containing the posts and view counts.
Let’s change the Post
model so that we can accommodate the view counts scenario.
class Post(models.Model): author = models.ForeignKey(User) created = models.DateTimeField('Created Date', default=timezone.now) title = models.CharField('Title', max_length=200) content = models.TextField('Content') slug = models.SlugField('Slug') view_count = models.IntegerField("View Count", default=0) def __str__(self): return '"%s" by %s' % (self.title, self.author)
As always, when we change a model, we need to migrate the database:
$ ./manage.py makemigrations $ ./manage.py migrate
Let’s also modify the view_post
Django view to count views:
def view_post(request, slug): try: post = Post.objects.get(slug=slug) except Post.DoesNotExist: raise Http404("Poll does not exist") post.view_count += 1 post.save() return render(request, 'post.html', context={'post': post})
It would be useful to display the view_count
in the template. Add this
Viewed {{ post.view_count }} times
somewhere inside the publisher/templates/post.html
file. Do a few views on a post now and see how the counter increases.
Let’s create a Celery task. Since it is about posts, I’m going to place it in publisher/tasks.py
:
from django.template import Template, Context from django.core.mail import send_mail from django.contrib.auth import get_user_model from quick_publisher.celery import app from publisher.models import Post REPORT_TEMPLATE = """ Here's how you did till now: {% for post in posts %} "{{ post.title }}": viewed {{ post.view_count }} times | {% endfor %} """ @app.task def send_view_count_report(): for user in get_user_model().objects.all(): posts = Post.objects.filter(author=user) if not posts: continue template = Template(REPORT_TEMPLATE) send_mail( 'Your QuickPublisher Activity', template.render(context=Context({'posts': posts})), '[email protected]', [user.email], fail_silently=False, )
Every time you make changes to the Celery tasks, remember to restart the Celery process. Celery needs to discover and reload tasks. Before creating a periodic task, we should test this out in the Django shell to make sure everything works as intended:
$ ./manage.py shell In [1]: from publisher.tasks import send_view_count_report In [2]: send_view_count_report.delay()
Hopefully, you received a nifty little report in your email.
Let’s now create a periodic task. Open up quick_publisher/celery.py
and register the periodic tasks:
# quick_publisher/celery.py import os from celery import Celery from celery.schedules import crontab os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'quick_publisher.settings') app = Celery('quick_publisher') app.config_from_object('django.conf:settings') # Load task modules from all registered Django app configs. app.autodiscover_tasks() app.conf.beat_schedule = { 'send-report-every-single-minute': { 'task': 'publisher.tasks.send_view_count_report', 'schedule': crontab(), # change to `crontab(minute=0, hour=0)` if you want it to run daily at midnight }, }
So far, we created a schedule that would run the task publisher.tasks.send_view_count_report
every minute as indicated by the crontab()
notation. You can also specify various Celery Crontab schedules.
Open up another console, activate the appropriate environment, and start the Celery Beat service.
$ celery -A quick_publisher beat
The Beat service’s job is to push tasks in Celery according to the schedule. Take into account that the schedule makes the send_view_count_report
task run every minute according to the setup. It’s good for testing but not recommended for a real-world web application.
Making Tasks More Reliable
Tasks are often used to perform unreliable operations, operations that depend on external resources or that can easily fail due to various reasons. Here’s a guideline for making them more reliable:
- Make tasks idempotent. An idempotent task is a task that, if stopped midway, doesn’t change the state of the system in any way. The task either makes full changes to the system or none at all.
- Retry the tasks. If the task fails, it’s a good idea to try it again and again until it’s executed successfully. You can do this in Celery with Celery Retry. One other interesting thing to look at is the Exponential Backoff algorithm. This could come in handy when thinking about limiting unnecessary load on the server from retried tasks.
Conclusions
I hope this has been an interesting tutorial for you and a good introduction to using Celery with Django.
Here are a few conclusions we can draw:
- It’s good practice to keep unreliable and time-consuming tasks outside the request time.
- Long-running tasks should be executed in the background by worker processes (or other paradigms).
- Background tasks can be used for various tasks that are not critical for the basic functioning of the application.
- Celery can also handle periodic tasks using the
celery beat
service. - Tasks can be more reliable if made idempotent and retried (maybe using exponential backoff).