Tech Tutorial - March 01 2026 233007
HOW TO GUIDES March 1, 2026, 11:30 p.m.

Tech Tutorial - March 01 2026 233007

Welcome to our deep‑dive tutorial! Today we’ll walk through building a real‑time notification system using FastAPI, WebSockets, and Redis. By the end of this guide you’ll have a production‑ready prototype that pushes live alerts to browsers and mobile clients. Grab a cup of coffee, fire up your editor, and let’s get coding.

Why FastAPI, WebSockets, and Redis?

FastAPI is renowned for its speed, async‑first design, and automatic OpenAPI docs—perfect for modern APIs. WebSockets give us a persistent, bidirectional channel, eliminating the latency of repeated HTTP polling. Redis, with its Pub/Sub capabilities, acts as an ultra‑fast message broker that scales horizontally without much overhead.

Combining these three technologies yields a stack that’s both developer‑friendly and production‑ready. You can handle thousands of concurrent connections, push updates instantly, and keep the codebase clean and maintainable.

Core Concepts to Master

  • Async endpoints in FastAPI
  • WebSocket lifecycle (connect, receive, send, disconnect)
  • Redis Pub/Sub pattern for decoupled messaging
  • Graceful shutdown and connection cleanup

Setting Up the Development Environment

First, ensure you have Python 3.11+ installed. We’ll use uvicorn as the ASGI server and aioredis for async Redis interaction. Create a fresh virtual environment and install the dependencies:

python -m venv .venv
source .venv/bin/activate  # Windows: .venv\Scripts\activate
pip install fastapi[all] uvicorn aioredis

Next, spin up a local Redis instance. If you have Docker, the quickest way is:

docker run -d --name redis-dev -p 6379:6379 redis:7-alpine

With the stack ready, we can start coding the API.

Creating the FastAPI Application

Open a new file called main.py. We’ll define the FastAPI app, a simple health‑check endpoint, and the WebSocket route.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import aioredis
import json

app = FastAPI()
redis = None  # Will be initialized on startup


@app.on_event("startup")
async def startup_event():
    global redis
    redis = await aioredis.from_url("redis://localhost", decode_responses=True)


@app.on_event("shutdown")
async def shutdown_event():
    await redis.close()


@app.get("/health")
async def health_check():
    return {"status": "ok"}

The startup_event connects to Redis once, and shutdown_event ensures a clean teardown. Now let’s add the WebSocket endpoint.

WebSocket Connection Lifecycle

@app.websocket("/ws/notifications")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    pubsub = redis.pubsub()
    await pubsub.subscribe("notifications")

    try:
        while True:
            # Wait for a message from Redis
            message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
            if message:
                await websocket.send_text(message["data"])

            # Check if client sent any data (e.g., ping)
            if websocket.client_state != websocket.client_state.CONNECTED:
                break
    except WebSocketDisconnect:
        pass
    finally:
        await pubsub.unsubscribe("notifications")
        await pubsub.close()

This handler accepts the WebSocket, subscribes to the Redis channel “notifications”, and streams any incoming messages directly to the client. The loop also respects client disconnects, preventing orphaned subscriptions.

Pro tip: Use ignore_subscribe_messages=True to filter out the initial subscription confirmation that Redis sends. It keeps your payload clean.

Publishing Notifications from the Backend

In a real app, notifications could originate from various services—order processors, background jobs, or admin panels. For this tutorial, we’ll expose a simple HTTP POST endpoint that publishes a message to Redis.

@app.post("/notify")
async def publish_notification(payload: dict):
    """
    Expected JSON:
    {
        "title": "New Order",
        "message": "Order #12345 placed",
        "user_id": 42
    }
    """
    await redis.publish("notifications", json.dumps(payload))
    return {"detail": "Notification queued"}

Clients listening on /ws/notifications will instantly receive the JSON payload. Because the publishing is async, the API remains responsive even under heavy load.

Testing the End‑to‑End Flow

  1. Start the server: uvicorn main:app --reload
  2. Open a browser console and run:
    
    let ws = new WebSocket("ws://localhost:8000/ws/notifications");
    ws.onmessage = (e) => console.log("🔔", JSON.parse(e.data));
        
  3. In a separate terminal, send a notification:
    curl -X POST http://localhost:8000/notify -H "Content-Type: application/json" -d '{"title":"Alert","message":"Server rebooted","user_id":1}'
        
  4. Watch the browser console display the incoming alert instantly.

If you see the message, congratulations—you’ve built a functional real‑time notification system!

Pro tip: For production, secure the WebSocket endpoint with JWT authentication and restrict the Redis channel per user or group to avoid leaking private data.

Scaling Considerations

When you move beyond a single‑node dev environment, a few challenges emerge: connection count, message fan‑out, and fault tolerance. Let’s address each.

Connection count: Each WebSocket consumes a file descriptor. Nginx or a dedicated ASGI server like uvicorn[standard] can handle tens of thousands, but you may need to tune OS limits (ulimit -n) and use uvloop for extra speed.

Message fan‑out: If you have 10,000 clients, broadcasting a single Redis message to all of them can become a bottleneck. One strategy is to shard channels (e.g., notifications:region:us) and let each client subscribe only to relevant shards.

Fault tolerance: Deploy Redis in a clustered mode with replication. FastAPI’s graceful shutdown hooks ensure connections are closed cleanly, but you should also implement reconnection logic on the client side to handle temporary network glitches.

Horizontal Scaling with Multiple Workers

FastAPI works seamlessly with gunicorn + uvicorn.workers.UvicornWorker. Launching multiple workers spreads the load across CPU cores, while Redis remains the single source of truth for messages.

gunicorn main:app -k uvicorn.workers.UvicornWorker -w 4 --bind 0.0.0.0:8000

Each worker maintains its own Redis Pub/Sub connection, but thanks to Redis’s efficient publish mechanism, the overhead is minimal.

Adding User‑Specific Notifications

So far we broadcast to every connected client. In many apps, notifications are user‑specific. We’ll extend the system to support per‑user channels using a naming convention like notifications:user:{user_id}.

@app.websocket("/ws/notifications/{user_id}")
async def user_ws_endpoint(websocket: WebSocket, user_id: int):
    await websocket.accept()
    channel = f"notifications:user:{user_id}"
    pubsub = redis.pubsub()
    await pubsub.subscribe(channel)

    try:
        while True:
            msg = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
            if msg:
                await websocket.send_text(msg["data"])
    except WebSocketDisconnect:
        pass
    finally:
        await pubsub.unsubscribe(channel)
        await pubsub.close()

Now the publishing endpoint must target the correct user channel.

@app.post("/notify/{user_id}")
async def publish_user_notification(user_id: int, payload: dict):
    channel = f"notifications:user:{user_id}"
    await redis.publish(channel, json.dumps(payload))
    return {"detail": f"Notification sent to user {user_id}"}

Clients simply connect to /ws/notifications/42 to receive only their own alerts. This pattern scales nicely because Redis only forwards messages to subscribers of the specific channel.

Pro tip: Store a short‑lived token (e.g., a JWT) in the WebSocket query string and validate it inside the endpoint. That way you can enforce that user_id matches the authenticated identity.

Persisting Notification History

Live alerts are great, but users often want to view past notifications. A lightweight solution is to push each message into a Redis list (or a dedicated database) as it’s published.

async def store_notification(user_id: int, payload: dict):
    key = f"history:user:{user_id}"
    await redis.rpush(key, json.dumps(payload))
    # Trim to last 100 items to bound memory usage
    await redis.ltrim(key, -100, -1)

Update the publishing endpoint to call this helper:

@app.post("/notify/{user_id}")
async def publish_user_notification(user_id: int, payload: dict):
    channel = f"notifications:user:{user_id}"
    message = json.dumps(payload)
    await redis.publish(channel, message)
    await store_notification(user_id, payload)
    return {"detail": f"Notification sent to user {user_id}"}

Expose a GET endpoint for fetching the history:

@app.get("/history/{user_id}")
async def get_notification_history(user_id: int):
    key = f"history:user:{user_id}"
    raw = await redis.lrange(key, 0, -1)
    return [json.loads(item) for item in raw]

This approach keeps the system self‑contained; you can later swap Redis for a persistent store without changing the client‑side logic.

Front‑End Integration with React

Let’s quickly sketch how a React component would consume the WebSocket stream. The component establishes a connection on mount, updates local state with incoming messages, and cleans up on unmount.

import { useEffect, useState } from "react";

function Notifications({ userId }) {
  const [messages, setMessages] = useState([]);

  useEffect(() => {
    const ws = new WebSocket(`ws://localhost:8000/ws/notifications/${userId}`);

    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setMessages((prev) => [data, ...prev]);
    };

    ws.onerror = (err) => console.error("WebSocket error:", err);

    return () => {
      ws.close();
    };
  }, [userId]);

  return (
    <ul>
      {messages.map((msg, idx) => (
        <li key={idx}>
          <strong>{msg.title}:</strong> {msg.message}
        </li>
      ))}
    </ul>
  );
}

Notice the clean‑up function that closes the socket—essential for preventing memory leaks in long‑running SPAs.

Pro tip: Wrap the WebSocket logic in a custom hook (useWebSocket) to reuse it across multiple components and handle reconnection automatically.

Monitoring and Observability

Running a real‑time service demands visibility. FastAPI integrates nicely with Prometheus via starlette_exporter. Install it and expose a metrics endpoint:

pip install starlette_exporter
from starlette_exporter import PrometheusMiddleware, handle_metrics

app.add_middleware(PrometheusMiddleware)
app.add_route("/metrics", handle_metrics)

Metrics such as active WebSocket connections, request latency, and Redis publish latency become available for Grafana dashboards. Pair this with structured logs (e.g., using loguru) to trace issues in production.

Security Hardening Checklist

  • Enforce TLS for both HTTP and WebSocket traffic.
  • Validate JWTs on every WebSocket connection.
  • Rate‑limit the /notify endpoint to prevent abuse.
  • Set Redis ACLs: separate users for publishing vs. subscribing.
  • Enable Redis AUTH and bind only to trusted interfaces.

Implementing these safeguards early saves you from costly retrofits later.

Deploying to Production

Containerize the application with a lightweight python:3.11-slim base image. Below is a minimal Dockerfile.

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .
EXPOSE 8000

CMD ["gunicorn", "main:app", "-k", "uvicorn.workers.UvicornWorker", "-w", "4", "--bind", "0.0.0.0:8000"]

Push the image to your registry, then orchestrate with Docker Compose or Kubernetes. In Kubernetes, you’d typically deploy a Deployment with a Service of type LoadBalancer, and a sidecar container for Redis or a managed Redis service.

Testing Strategies

Automated tests ensure reliability as you iterate. Use pytest together with httpx for async HTTP testing, and websockets for WebSocket integration tests.

import pytest
from httpx import AsyncClient
from main import app

@pytest.mark.asyncio
async def test_health():
    async with AsyncClient(app=app, base_url="http://test") as client:
        resp = await client.get("/health")
        assert resp.status_code == 200
        assert resp.json()["status"] == "ok"

@pytest.mark.asyncio
async def test_notification_flow():
    async with AsyncClient(app=app, base_url="http://test") as client:
        # Publish a notification
        payload = {"title": "Test", "message": "Hello", "user_id": 1}
        await client.post("/notify/1", json=payload)

        # Connect via WebSocket
        async with client.ws_connect("/ws/notifications/1") as ws:
            data = await ws.receive_text()
            assert data == json.dumps(payload)

Running these tests in CI/CD pipelines catches regressions before they reach users.

Performance Benchmarks

On a modest EC2 t3.medium (

Share this article