Tech Tutorial - February 24 2026 113007
AI TOOLS Feb. 24, 2026, 11:30 a.m.

Tech Tutorial - February 24 2026 113007

Welcome back, Codeyaan explorers! In today’s deep‑dive we’ll build a production‑ready real‑time chat application from scratch using FastAPI, WebSockets, and Redis. By the end of this tutorial you’ll understand how these pieces fit together, why they’re a perfect match for low‑latency messaging, and how to extend the pattern to other real‑time use cases like live dashboards or multiplayer games. Grab your favorite IDE, fire up a terminal, and let’s turn theory into a working prototype.

Why FastAPI, WebSockets, and Redis?

FastAPI is a modern, high‑performance web framework built on Starlette and Pydantic. It gives you async‑first routes, automatic OpenAPI docs, and a developer experience that feels like Flask with a turbo boost. WebSockets, on the other hand, provide full‑duplex communication over a single TCP connection, letting the server push data to the client without the overhead of repeated HTTP requests. Finally, Redis isn’t just a key‑value store; its Pub/Sub feature acts as a lightning‑fast message broker, enabling you to broadcast chat messages across multiple worker processes or even different machines.

When you combine these three, you get a stack that scales horizontally, remains responsive under load, and stays simple enough for a solo developer to maintain. In the next sections we’ll walk through each component, stitch them together, and end with a deployment‑ready setup.

Setting Up the Development Environment

First, ensure you have Python 3.11+ installed. Create a virtual environment to keep dependencies isolated:

python -m venv venv
source venv/bin/activate  # On Windows use `venv\Scripts\activate`

Next, install the core libraries:

pip install fastapi[all] uvicorn redis

We’ll also need aiofiles for serving static assets and python‑dotenv to manage environment variables. Install them with:

pip install aiofiles python-dotenv

Finally, spin up a local Redis instance. If you have Docker installed, the quickest way is:

docker run -d --name redis-chat -p 6379:6379 redis:7-alpine

Creating the FastAPI Application

Let’s start with the minimal FastAPI skeleton. Create a file named main.py and add the following:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import os

app = FastAPI()

# Serve the simple HTML client from the "static" folder
app.mount("/static", StaticFiles(directory="static"), name="static")

We mount a /static route to serve our frontend later. For now, let’s add a health‑check endpoint so we can verify the server is up:

@app.get("/health")
async def health_check():
    return {"status": "ok"}

Run the app with Uvicorn to confirm everything works:

uvicorn main:app --reload

Navigate to http://127.0.0.1:8000/health – you should see {"status":"ok"}. If you do, great! We’re ready for the real‑time part.

Adding a WebSocket Endpoint

WebSocket routes in FastAPI are defined with the WebSocket class. Below we’ll create a simple endpoint that accepts connections, receives messages, and echoes them back. Add this to main.py:

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()


@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # In a real app you’d validate & store the message here
            await manager.broadcast(data)
    except WebSocketDisconnect:
        manager.disconnect(websocket)

This ConnectionManager tracks all connected clients and pushes any received text to every client. It’s a “fan‑out” pattern perfect for a chat room where everyone sees every message.

Integrating Redis Pub/Sub for Scalability

The in‑memory manager works fine for a single‑process server, but as soon as you run multiple workers (or scale to multiple containers) each process has its own connection list. To keep all instances in sync we’ll replace the direct broadcast with a Redis Pub/Sub channel.

First, create a Redis client that can publish and subscribe asynchronously. Add the following near the top of main.py:

import asyncio
import redis.asyncio as aioredis

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
redis = aioredis.from_url(REDIS_URL, decode_responses=True)

CHANNEL_NAME = "chatroom"

Now modify the ConnectionManager to listen for messages from Redis and forward them to connected websockets:

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []
        self.redis_task = asyncio.create_task(self.redis_listener())

    async def redis_listener(self):
        pubsub = redis.pubsub()
        await pubsub.subscribe(CHANNEL_NAME)
        async for message in pubsub.listen():
            if message["type"] == "message":
                await self.broadcast(message["data"])

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

    async def publish(self, message: str):
        await redis.publish(CHANNEL_NAME, message)

Finally, update the WebSocket endpoint to publish incoming messages to Redis instead of directly broadcasting:

@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.publish(data)  # Send to Redis, all workers will receive
    except WebSocketDisconnect:
        manager.disconnect(websocket)

With this change, any instance of the FastAPI app can handle WebSocket connections, and Redis guarantees every message reaches every client, regardless of which worker they’re attached to.

Building a Minimal Frontend

Let’s create a tiny HTML page that connects to our WebSocket endpoint and displays messages in real time. Inside a new folder called static, add index.html with the following content:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>FastAPI Chat</title>
    <style>
        body {font-family: Arial, sans-serif; margin: 2rem;}
        #messages {border: 1px solid #ccc; height: 300px; overflow-y: scroll; padding: .5rem;}
        #input {width: 80%; padding: .5rem;}
        #send {padding: .5rem 1rem;}
    </style>
</head>
<body>
    <h1>Realtime Chat</h1>
    <div id="messages"></div>
    <input id="input" type="text" placeholder="Type a message..." />
    <button id="send">Send</button>

    <script>
        const ws = new WebSocket(`ws://${location.host}/ws/chat`);
        const messagesDiv = document.getElementById('messages');
        const input = document.getElementById('input');
        const sendBtn = document.getElementById('send');

        ws.onmessage = (event) => {
            const msg = document.createElement('div');
            msg.textContent = event.data;
            messagesDiv.appendChild(msg);
            messagesDiv.scrollTop = messagesDiv.scrollHeight;
        };

        sendBtn.onclick = () => {
            if (input.value) {
                ws.send(input.value);
                input.value = '';
            }
        };

        // Optional: send on Enter key
        input.addEventListener('keypress', (e) => {
            if (e.key === 'Enter') sendBtn.click();
        });
    </script>
</body>
</html>

Because we mounted /static earlier, the page is reachable at http://127.0.0.1:8000/static/index.html. Open two browser tabs, type messages, and watch them appear instantly in both windows.

Pro tip: When testing locally, use ws://localhost:8000/ws/chat in the script. In production, switch to wss:// (secure WebSockets) and ensure your reverse proxy forwards the upgrade headers.

Persisting Chat History

Real‑time chat is fun, but most applications need a record of past conversations. Redis can act as a transient store, but for durability we’ll push each message into a PostgreSQL table. Install the async driver:

pip install asyncpg sqlalchemy

Define a simple SQLAlchemy model in a new file models.py:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String, DateTime, func

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/chatdb"

engine = create_async_engine(DATABASE_URL, echo=False)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

Base = declarative_base()


class Message(Base):
    __tablename__ = "messages"

    id = Column(Integer, primary_key=True, index=True)
    content = Column(String, nullable=False)
    timestamp = Column(DateTime(timezone=True), server_default=func.now())

Run a quick migration (or use Alembic for production) to create the table:

python -c "import asyncio, models; asyncio.run(models.Base.metadata.create_all(models.engine))"

Now, back in main.py, import the session and store each incoming message before publishing it:

from models import AsyncSessionLocal, Message

async def save_message(content: str):
    async with AsyncSessionLocal() as session:
        msg = Message(content=content)
        session.add(msg)
        await session.commit()

Update the WebSocket loop to call save_message:

while True:
    data = await websocket.receive_text()
    await save_message(data)          # Persist
    await manager.publish(data)       # Broadcast

Finally, expose an endpoint to fetch the last 50 messages so new users see recent history:

@app.get("/messages")
async def recent_messages(limit: int = 50):
    async with AsyncSessionLocal() as session:
        result = await session.execute(
            select(Message).order_by(Message.timestamp.desc()).limit(limit)
        )
        messages = [row[0].content for row in result.fetchall()]
    return {"messages": messages[::-1]}  # Reverse to chronological order

Modify the frontend to load these on startup:

<script>
    async function loadHistory() {
        const resp = await fetch('/messages');
        const data = await resp.json();
        data.messages.forEach(msg => {
            const el = document.createElement('div');
            el.textContent = msg;
            messagesDiv.appendChild(el);
        });
        messagesDiv.scrollTop = messagesDiv.scrollHeight;
    }
    loadHistory();
    // existing WebSocket code follows...
</script>

Testing the Application

Automated testing of WebSocket routes can be done with httpx and pytest‑asyncio. Install the test dependencies:

pip install pytest pytest-asyncio httpx

Create a tests/test_chat.py file with the following example:

import pytest
import asyncio
from httpx import AsyncClient
from main import app

@pytest.mark.asyncio
async def test_websocket_echo():
    async with AsyncClient(app=app, base_url="http://test") as client:
        async with client.websocket_connect("/ws/chat") as ws:
            await ws.send_text("Hello, Test!")
            data = await ws.receive_text()
            assert data == "Hello, Test!"

Run pytest -q. You should see the test pass, confirming that the WebSocket path accepts connections and correctly forwards messages (through Redis in the background).

Pro tip: When writing integration tests that involve Redis, spin up a temporary Redis container with pytest‑docker or use the fakeredis library to avoid polluting your development instance.

Deploying to Production

For production we recommend Dockerizing the entire stack. Below is a minimal Dockerfile for the FastAPI app:

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .
ENV PORT=8000
EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

And a docker-compose.yml that brings up FastAPI, Redis, and PostgreSQL together:

version: "3.9"

services:
  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql+asyncpg://postgres:postgres@db/chatdb
    depends_on:
      - redis
      - db

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  db:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: chatdb
    ports:
      - "5432:5432"

Run docker

Share this article