Part 9, Async Django, Channels, and Celery
The three problems
A Django app hits three ceilings as it grows:
- Slow external I/O in views, a call to Stripe, OpenAI, Slack. Blocks the worker.
- Real-time features, WebSockets, server-sent events, live dashboards. HTTP isn’t the right protocol.
- Work that outlives the request, sending email, running reports, processing uploads. Should happen after the user gets their response.
Answers: async views, Channels, Celery. Sometimes more than one.
Async views
Since Django 4.1 (and mature by 5.x), any view can be async def:
import asyncioimport httpx
async def dashboard(request): async with httpx.AsyncClient() as client: stripe_task = client.get("https://api.stripe.com/...") analytics_task = client.get("https://api.example.com/...") stripe_resp, analytics_resp = await asyncio.gather(stripe_task, analytics_task)
return render(request, "dashboard.html", { "stripe": stripe_resp.json(), "analytics": analytics_resp.json(), })Requirements:
- ASGI server,
uvicorn,daphne, orhypercorn. Gunicorn alone won’t run async views (usegunicorn -k uvicorn.workers.UvicornWorker). ASGI_APPLICATIONset insettings.py, notWSGI_APPLICATION.
Async ORM
Django 4.1+ has async-prefixed methods:
async def get_post(slug: str): return await Post.objects.aget(slug=slug)
async def list_recent(): return [p async for p in Post.objects.filter(published_at__isnull=False).order_by("-published_at")[:10]]Available: acreate(), aget(), afirst(), alast(), acount(), aexists(), aupdate_or_create(), aget_or_create(), aupdate(), adelete(), asave().
Gotcha: mixing sync ORM in an async view raises SynchronousOnlyOperation. Wrap sync code with sync_to_async:
from asgiref.sync import sync_to_async
async def handler(request): user = await sync_to_async(get_legacy_user)(request) ...When async actually helps
- External HTTP calls, concurrent with
asyncio.gather. - WebSockets / SSE, one worker, many long-lived connections.
When it doesn’t
- CPU-bound work, async doesn’t parallelize Python code; you’re still bound by the GIL.
- Pure DB-heavy endpoints, Django’s DB driver isn’t truly async yet (uses a thread pool under the hood). Often no measurable win over sync.
Channels, WebSockets and beyond
Channels extends Django for WebSockets and long-lived protocols.
pip install channels channels_redisINSTALLED_APPS += ["daphne", "channels"]ASGI_APPLICATION = "mysite.asgi.application"CHANNEL_LAYERS = { "default": { "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": {"hosts": [("127.0.0.1", 6379)]}, },}A minimal consumer:
import jsonfrom channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer): async def connect(self): self.room = self.scope["url_route"]["kwargs"]["room"] await self.channel_layer.group_add(self.room, self.channel_name) await self.accept()
async def disconnect(self, code): await self.channel_layer.group_discard(self.room, self.channel_name)
async def receive(self, text_data=None, bytes_data=None): payload = json.loads(text_data) await self.channel_layer.group_send( self.room, {"type": "chat.message", "message": payload["message"]}, )
async def chat_message(self, event): await self.send(text_data=json.dumps({"message": event["message"]}))Routing (mirrors urls.py):
from django.urls import re_pathfrom . import consumers
websocket_urlpatterns = [ re_path(r"^ws/chat/(?P<room>\w+)/$", consumers.ChatConsumer.as_asgi()),]import osfrom channels.auth import AuthMiddlewareStackfrom channels.routing import ProtocolTypeRouter, URLRouterfrom django.core.asgi import get_asgi_applicationfrom chat.routing import websocket_urlpatterns
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
application = ProtocolTypeRouter({ "http": get_asgi_application(), "websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),})Run with Daphne or uvicorn. The channel layer (here, Redis) is what lets multiple ASGI workers share state, without it, a WebSocket message to user 42 only reaches the worker that user 42 happened to connect to.
Celery, background jobs
Async views don’t replace Celery. asyncio.gather is good for “do these three things concurrently during this request”; Celery is for “queue this work, respond to the user immediately, run it later.”
Setup:
pip install celery redisimport osfrom celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")app = Celery("mysite")app.config_from_object("django.conf:settings", namespace="CELERY")app.autodiscover_tasks()from .celery import app as celery_app__all__ = ("celery_app",)CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0"CELERY_TASK_ALWAYS_EAGER = False # True in testsCELERY_TASK_SERIALIZER = "json"CELERY_RESULT_SERIALIZER = "json"CELERY_ACCEPT_CONTENT = ["json"]CELERY_TIMEZONE = "UTC"Define tasks in each app:
from celery import shared_taskfrom django.core.mail import send_mail
@shared_task( bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=60, max_retries=5,)def notify_author(self, post_id: int): from .models import Post post = Post.objects.select_related("author").get(pk=post_id) send_mail( subject=f"Your post '{post.title}' was published", message="Congrats.", from_email="noreply@example.com", recipient_list=[post.author.email], )Trigger from a view:
def publish(request, slug): post = get_object_or_404(Post, slug=slug) post.published_at = timezone.now() post.save() notify_author.delay(post.id) # returns immediately return redirect("blog:detail", slug=slug)Run the worker:
celery -A mysite worker -l infoScheduled tasks (beat)
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = { "daily-digest": { "task": "blog.tasks.send_daily_digest", "schedule": crontab(hour=7, minute=0), },}Run the scheduler:
celery -A mysite beat -l infoPitfalls worth knowing
- Pass IDs, not model instances. Serializing an unsaved or complex model across the broker goes wrong quickly. Always pass a primary key and re-fetch inside the task.
- Idempotency. Tasks can be retried. A task that sends an email twice is a bug. Use an idempotency key or a “sent” flag.
- Transactions. Triggering
.delay()inside a DB transaction means the task may start before the transaction commits and see the old state. Usetransaction.on_commit(lambda: task.delay(id))ordjango.db.transaction.on_commit. TASK_ALWAYS_EAGERruns tasks synchronously, set in tests to avoid the broker dependency.- Monitoring. Use Flower or a commercial APM to see queue length and failures. Silent backlog = silent outage.
Celery vs alternatives
Celery is the 800-pound gorilla, but it’s complex. Simpler options:
django-rq, Redis Queue. Simpler, fewer moving parts.django-q2, lightweight, scheduling included.huey, small, no broker required (can use Redis or SQLite).- Postgres-backed (
pgq, procrastinate), one fewer service to run; the DB becomes the queue.
For a modest app, huey or a Postgres-backed queue is usually the better starting point.
Gotchas summary
- Mixing sync and async under ASGI, wrap sync code with
sync_to_async; wrap async code called from sync withasync_to_sync. - Gunicorn + async,
gunicorn -k uvicorn.workers.UvicornWorkeror switch touvicorndirectly. - Channels without the channel layer, works, but each worker is isolated. Production always needs Redis or a similar layer.
- Celery import order, tasks must be importable at worker startup.
autodiscover_tasks()needs tasks in<app>/tasks.py. django_redisvs Redis cache backend built into Django 4+, the stdlib version is usually enough;django-redisonly if you need its advanced features.
What’s next
Part 10 ships all this to production.
References
- Async support, Django docs
- Django Channels
- Celery documentation
- Flower, Celery monitoring
- huey, simpler task queue
- procrastinate, Postgres-backed task queue