Tech Tutorial - February 26 2026 173006
Welcome back, Codeyaan explorers! Today we’ll build a production‑ready real‑time chat application using FastAPI, WebSockets, and Redis. By the end of this tutorial you’ll understand how to wire asynchronous endpoints, broadcast messages efficiently, and secure your sockets with JWTs. Grab your favorite editor, fire up a virtual environment, and let’s dive in.
Why FastAPI + WebSockets + Redis?
FastAPI shines with async support, automatic OpenAPI docs, and a developer‑friendly syntax. WebSockets give us low‑latency, bi‑directional communication—perfect for chat. Redis, with its pub/sub model, scales the broadcast layer without adding heavy infrastructure.
Combining these three tools yields a stack that’s both lightweight and production‑grade. You can run it on a single VM for development, then scale horizontally by adding more FastAPI workers behind a load balancer—all while Redis handles message distribution.
Key requirements
- Python 3.11+
- FastAPI and Uvicorn
- Redis server (Docker or local install)
- PyJWT for token handling
Project setup
First, create a fresh directory and a virtual environment. Then install the dependencies.
mkdir fastapi-chat && cd fastapi-chat
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install fastapi uvicorn redis[async] pyjwt python-multipart
We’ll keep the code in three files: main.py, auth.py, and models.py. This separation makes the project easier to test and extend.
Authentication with JWT
Even though chat apps often start simple, securing WebSocket connections is non‑negotiable in production. We’ll issue short‑lived JWTs that clients attach as a query parameter.
# auth.py
import time
import jwt
SECRET_KEY = "super-secret-key-change-me"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_SECONDS = 3600
def create_access_token(username: str) -> str:
payload = {
"sub": username,
"iat": int(time.time()),
"exp": int(time.time()) + ACCESS_TOKEN_EXPIRE_SECONDS,
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str) -> str | None:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload.get("sub")
except jwt.PyJWTError:
return None
Our create_access_token function will be called from a simple login endpoint we’ll add later. The decode_access_token helper validates the token and extracts the username.
Pro tip: Rotate SECRET_KEY periodically and store it in an environment variable or secret manager. Hard‑coding keys is fine for tutorials, but never in production.
Defining the data model
We only need a minimal model for chat messages: the sender, timestamp, and content. Pydantic makes validation a breeze.
# models.py
from pydantic import BaseModel, Field
from datetime import datetime
class ChatMessage(BaseModel):
sender: str = Field(..., description="Username of the sender")
timestamp: datetime = Field(default_factory=datetime.utcnow)
content: str = Field(..., min_length=1, max_length=500)
By using default_factory we guarantee every message carries a UTC timestamp without extra code in the endpoint.
FastAPI app skeleton
Now we stitch everything together in main.py. The file will host the HTTP login route, the WebSocket endpoint, and the Redis client.
# main.py
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status
from fastapi.responses import HTMLResponse
import aioredis
from auth import create_access_token, decode_access_token
from models import ChatMessage
app = FastAPI()
redis = aioredis.from_url("redis://localhost", decode_responses=True)
# Simple in‑memory user store (replace with DB in real apps)
USERS = {"alice": "wonderland", "bob": "builder"}
def get_current_user(token: str) -> str:
username = decode_access_token(token)
if not username:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
return username
The get_current_user dependency will be reused for both HTTP and WebSocket routes. Let’s add the login endpoint next.
Login endpoint
@app.post("/login")
async def login(username: str, password: str):
if USERS.get(username) != password:
raise HTTPException(status_code=400, detail="Incorrect credentials")
token = create_access_token(username)
return {"access_token": token, "token_type": "bearer"}
Clients POST username and password to receive a JWT. In a real deployment you’d hash passwords and use a database, but the principle stays the same.
WebSocket connection handling
WebSockets in FastAPI are just async functions that receive a WebSocket object. We’ll validate the JWT, register the socket in a Redis pub/sub channel, and broadcast incoming messages.
@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket):
# Extract token from query params: ws://host/ws/chat?token=...
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=1008)
return
try:
username = get_current_user(token)
except HTTPException:
await websocket.close(code=1008)
return
await websocket.accept()
channel = f"room:global"
# Subscribe to Redis channel
pubsub = redis.pubsub()
await pubsub.subscribe(channel)
async def send_messages():
async for message in pubsub.listen():
if message["type"] == "message":
await websocket.send_text(message["data"])
# Launch background task to forward Redis messages to this socket
send_task = asyncio.create_task(send_messages())
try:
while True:
data = await websocket.receive_text()
chat_msg = ChatMessage(sender=username, content=data)
await redis.publish(channel, chat_msg.json())
except WebSocketDisconnect:
await pubsub.unsubscribe(channel)
send_task.cancel()
finally:
await pubsub.close()
The loop receives text from the client, wraps it in a ChatMessage, and publishes it to the Redis channel. All connected sockets listening to the same channel receive the broadcast.
Pro tip: Use separate Redis channels for private rooms or groups. Prefix channels with a UUID for each room to avoid cross‑talk.
Front‑end demo page
To test the server quickly, we’ll serve a minimal HTML page that connects via JavaScript. FastAPI can return an HTMLResponse directly.
@app.get("/", response_class=HTMLResponse)
async def get():
return """
FastAPI Real‑Time Chat
"""
Open http://localhost:8000, paste the JWT you received from /login, and start chatting. Open multiple tabs to see the real‑time broadcast in action.
Running the stack with Docker Compose
While you can run Redis locally, Docker Compose gives you a reproducible environment. Create a docker-compose.yml file:
version: "3.9"
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
api:
build: .
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
volumes:
- .:/app
ports:
- "8000:8000"
depends_on:
- redis
And a minimal Dockerfile for the FastAPI service:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Generate requirements.txt with the packages we installed earlier, then spin everything up:
docker compose up --build
The API will be reachable at http://localhost:8000, and Redis will be ready for pub/sub without any extra configuration.
Scaling considerations
When you add more FastAPI workers (e.g., via uvicorn --workers 4 or a process manager like Gunicorn), each worker maintains its own WebSocket connections. Redis ensures that a message published by any worker reaches all others, preserving the global chat state.
For massive scale you might consider:
- Using Redis Streams instead of simple pub/sub to retain message history.
- Persisting chat logs to PostgreSQL for audit and replay.
- Deploying a reverse proxy (NGINX or Traefik) that handles TLS termination and sticky sessions.
Persisting messages with Redis Streams
Replace the publish call with xadd to append to a stream. Consumers can then read from the stream with xread and replay missed messages after reconnects.
# Publishing
await redis.xadd("chat:stream", {"msg": chat_msg.json()})
# Consuming (in send_messages)
while True:
entries = await redis.xread(["chat:stream"], block=0, count=10)
for stream, msgs in entries:
for _, data in msgs:
await websocket.send_text(data["msg"])
This pattern gives you durability without a full database.
Testing your WebSocket logic
Automated testing of async WebSockets can be done with httpx.AsyncClient and websockets. Below is a concise pytest example.
# test_chat.py
import pytest, asyncio
from httpx import AsyncClient
from main import app, redis
@pytest.mark.asyncio
async def test_chat_flow():
async with AsyncClient(app=app, base_url="http://test") as client:
# Obtain token
resp = await client.post("/login", data={"username": "alice", "password": "wonderland"})
token = resp.json()["access_token"]
# Connect WebSocket
async with client.websocket_connect(f"/ws/chat?token={token}") as ws:
await ws.send_text("Hello world")
# Receive the same message (echoed via Redis)
data = await ws.receive_text()
assert "Hello world" in data
Running pytest -s will spin up the FastAPI app in‑memory, connect to Redis, and verify the end‑to‑end flow.
Pro tip: Use a separate Redis instance (or a different DB number) for tests to avoid contaminating production data.
Deploying to the cloud
For a quick production deployment you can use Render, Fly.io, or Railway—each provides a one‑click Redis add‑on. The Dockerfile we wrote works out of the box; just push the repo and let the platform build the image.
Don’t forget to set environment variables for SECRET_KEY and the Redis URL. Also enable HTTPS; browsers block insecure WebSocket connections on secure pages.
Monitoring and observability
FastAPI integrates nicely with Prometheus. Install prometheus-fastapi-instrumentator and add a metrics endpoint.
pip install prometheus-fastapi-instrumentator
from prometheus_fastapi_instrumentator import Instrumentator
instrumentator = Instrumentator()
instrumentator.instrument(app).expose(app)
Now /metrics will expose latency, request count, and WebSocket connection metrics that you can scrape with a Prometheus server.
Conclusion
We’ve built a full‑stack real‑time chat app using FastAPI, WebSockets, and Redis, covered authentication, scaling, testing, and deployment. The same patterns apply to collaborative editors, live dashboards, or multiplayer game lobbies. Keep experimenting—swap Redis Streams for Kafka, add typing indicators, or integrate OAuth providers. Happy coding, and see you in the next Codeyaan tutorial!