Tech Tutorial - February 20 2026 233008
HOW TO GUIDES Feb. 20, 2026, 11:30 p.m.

Tech Tutorial - February 20 2026 233008

Welcome to today’s deep‑dive tutorial! We’ll walk through building a fully‑featured real‑time data dashboard using FastAPI, WebSockets, and Plotly. By the end of this guide you’ll have a production‑ready backend that streams live metrics, a sleek JavaScript front‑end that visualizes them, and a handful of tricks to keep your stack performant and secure.

Why Real‑Time Dashboards Matter

In modern SaaS products, stakeholders demand instant insight into system health, user activity, and business KPIs. Traditional polling approaches waste bandwidth and introduce latency, especially when you need sub‑second updates. WebSockets solve this by establishing a persistent, full‑duplex channel where the server can push data the moment it arrives.

FastAPI’s async‑first design makes it a natural fit for handling thousands of concurrent WebSocket connections with minimal overhead. Pair that with Plotly’s declarative charting library, and you get a responsive UI that feels snappy even under heavy load.

Core Concepts Covered

  • Setting up FastAPI with ASGI and WebSocket routes
  • Streaming JSON payloads efficiently
  • Integrating Plotly.js for dynamic visualizations
  • Running background tasks with asyncio and FastAPI lifespans
  • Production‑grade tips: CORS, authentication, and scaling

Project Skeleton

Before we dive into code, let’s outline the folder structure. Keeping a clean layout helps both development and future onboarding.

real_time_dashboard/
├── app/
│   ├── __init__.py
│   ├── main.py          # FastAPI entry point
│   ├── websocket.py     # WebSocket endpoint logic
│   └── metrics.py       # Simulated data source
├── static/
│   └── index.html       # Front‑end page with Plotly
├── requirements.txt
└── README.md

All Python code lives under app/, while static assets (HTML, CSS, JS) reside in static/. This separation mirrors how you’d deploy on platforms like Docker or Vercel.

Setting Up the FastAPI Backend

First, install the dependencies. FastAPI, Uvicorn (the ASGI server), and python‑websockets are the core pieces.

# requirements.txt
fastapi==0.110.0
uvicorn[standard]==0.27.0
websockets==12.0
pydantic==2.5.2

Now create app/main.py. This file boots the application, registers the WebSocket router, and serves static files.

# app/main.py
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from .websocket import router as ws_router

app = FastAPI(title="Real‑Time Dashboard")

# Mount static directory at root
app.mount("/", StaticFiles(directory="../static", html=True), name="static")

# Include WebSocket routes
app.include_router(ws_router)

# Lifespan event to start background metric generator
@app.on_event("startup")
async def start_metrics():
    from . import metrics
    await metrics.start_generator(app)

The startup event spins up a background coroutine that simulates incoming metrics. In a real product you’d replace this with a connection to a time‑series DB or an external API.

WebSocket Endpoint Logic

Let’s implement the actual WebSocket handler in app/websocket.py. The endpoint will accept connections, register them, and broadcast JSON payloads whenever new data arrives.

# app/websocket.py
import json
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from typing import List

router = APIRouter()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.active_connections.append(ws)

    def disconnect(self, ws: WebSocket):
        self.active_connections.remove(ws)

    async def broadcast(self, message: dict):
        data = json.dumps(message)
        for connection in self.active_connections:
            await connection.send_text(data)

manager = ConnectionManager()

@router.websocket("/ws/metrics")
async def websocket_endpoint(ws: WebSocket):
    await manager.connect(ws)
    try:
        while True:
            # Keep connection alive; client may send pings
            await ws.receive_text()
    except WebSocketDisconnect:
        manager.disconnect(ws)

The loop above simply waits for any incoming text (e.g., ping messages) to keep the connection alive. All outbound data is funneled through manager.broadcast, ensuring every connected client receives the same update.

Simulating Live Metrics

For demonstration purposes we’ll generate random CPU and memory usage values every second. The generator runs as a background task and pushes data to the ConnectionManager.

# app/metrics.py
import asyncio
import random
from .websocket import manager

async def generate_metrics():
    while True:
        metric = {
            "timestamp": asyncio.get_event_loop().time(),
            "cpu": round(random.uniform(10, 90), 2),
            "memory": round(random.uniform(200, 800), 2)  # MB
        }
        await manager.broadcast(metric)
        await asyncio.sleep(1)  # Emit every second

async def start_generator(app):
    # Store the task so we can cancel on shutdown if needed
    app.state.metric_task = asyncio.create_task(generate_metrics())

Notice the use of asyncio.create_task inside the startup event. This pattern lets FastAPI manage the coroutine’s lifecycle without blocking the main event loop.

Pro tip: In production, replace the random generator with a proper data source and consider using asyncio.Queue to decouple producers and consumers, improving back‑pressure handling.

Front‑End: Consuming the Stream with Plotly

The front‑end lives in static/index.html. It opens a WebSocket connection, buffers the last 30 data points, and updates a live line chart. Plotly’s newPlot and extendTraces APIs make this straightforward.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Real‑Time Dashboard</title>
    <script src="https://cdn.plot.ly/plotly-2.24.1.min.js"></script>
    <style>
        body { font-family: Arial, sans-serif; margin: 2rem; }
        #chart { width: 100%; height: 500px; }
    </style>
</head>
<body>
    <h1>Live System Metrics</h1>
    <div id="chart"></div>

    <script>
        const ws = new WebSocket(`ws://${location.host}/ws/metrics`);
        const maxPoints = 30;
        const timestamps = [];
        const cpuVals = [];
        const memVals = [];

        const layout = {
            title: 'CPU & Memory Usage',
            xaxis: { title: 'Time (s)' },
            yaxis: { title: 'Value', rangemode: 'tozero' },
            legend: { orientation: 'h' }
        };

        const data = [
            { x: timestamps, y: cpuVals, name: 'CPU %', mode: 'lines', line: {color: '#ff4136'} },
            { x: timestamps, y: memVals, name: 'Memory (MB)', mode: 'lines', line: {color: '#0074d9'} }
        ];

        Plotly.newPlot('chart', data, layout);

        ws.onmessage = (event) => {
            const msg = JSON.parse(event.data);
            const t = (msg.timestamp).toFixed(2);
            timestamps.push(t);
            cpuVals.push(msg.cpu);
            memVals.push(msg.memory);

            // Keep only the latest maxPoints entries
            if (timestamps.length > maxPoints) {
                timestamps.shift();
                cpuVals.shift();
                memVals.shift();
            }

            Plotly.update('chart', {
                x: [timestamps, timestamps],
                y: [cpuVals, memVals]
            });
        };

        ws.onclose = () => {
            console.warn('WebSocket closed – attempting reconnection...');
            // Simple reconnection logic
            setTimeout(() => location.reload(), 3000);
        };
    </script>
</body>
</html>

The script establishes a WebSocket connection to /ws/metrics, parses each JSON payload, and pushes the values into three parallel arrays. Plotly’s update call redraws the chart efficiently without a full re‑render.

Handling Network Glitches

WebSocket connections can drop due to network hiccups or server restarts. The onclose handler above demonstrates a minimal reconnection strategy: wait three seconds and reload the page. In a production UI you’d likely implement exponential back‑off and UI notifications instead of a hard reload.

Securing the Data Stream

Open WebSocket endpoints are attractive attack vectors. Let’s add a lightweight token‑based authentication layer. We’ll use a query‑string token that the server validates before accepting the connection.

# app/websocket.py (updated snippet)
from fastapi import Depends, HTTPException, status

VALID_TOKENS = {"secret-token-123", "dev-token-456"}

def get_token(ws: WebSocket):
    token = ws.query_params.get("token")
    if token not in VALID_TOKENS:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
                            detail="Invalid token")
    return token

@router.websocket("/ws/metrics")
async def websocket_endpoint(ws: WebSocket, token: str = Depends(get_token)):
    await manager.connect(ws)
    try:
        while True:
            await ws.receive_text()
    except WebSocketDisconnect:
        manager.disconnect(ws)

Clients now need to append ?token=secret-token-123 to the URL. The server checks the token against an in‑memory whitelist; for larger deployments you’d integrate with OAuth2 or JWT validation.

Pro tip: Always serve WebSockets over wss:// in production. Browsers block insecure WebSocket connections from HTTPS pages, and TLS protects token leakage.

Scaling to Hundreds of Thousands of Connections

FastAPI runs on a single‑process ASGI server by default. To handle massive concurrency you’ll want to deploy multiple workers behind a reverse proxy (e.g., Nginx) and use a message broker like Redis to broadcast metrics across workers.

  • Step 1 – Deploy with Uvicorn workers: uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
  • Step 2 – Introduce Redis Pub/Sub: Each worker publishes generated metrics to a Redis channel; all workers subscribe and forward to their local connections.
  • Step 3 – Horizontal scaling: Containerize the app (Docker) and orchestrate with Kubernetes, using a Service of type LoadBalancer to distribute traffic.

Below is a concise example of swapping the in‑process ConnectionManager for a Redis‑backed broadcaster.

# app/websocket_redis.py (illustrative)
import aioredis
import json
from fastapi import APIRouter, WebSocket, WebSocketDisconnect

router = APIRouter()
REDIS_CHANNEL = "metrics_stream"

class RedisManager:
    def __init__(self, redis):
        self.redis = redis
        self.connections = set()

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.connections.add(ws)

    def disconnect(self, ws: WebSocket):
        self.connections.discard(ws)

    async def broadcast(self, message: dict):
        await self.redis.publish(REDIS_CHANNEL, json.dumps(message))

    async def listen(self):
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(REDIS_CHANNEL)
        async for msg in pubsub.listen():
            if msg['type'] == 'message':
                data = msg['data']
                for ws in self.connections:
                    await ws.send_text(data)

# In startup event:
# redis = await aioredis.from_url("redis://redis:6379")
# manager = RedisManager(redis)
# asyncio.create_task(manager.listen())

With this pattern, any worker that receives a new metric publishes it once, and all workers forward it to their local clients. This eliminates duplicate work and keeps latency low.

Testing Your Dashboard

Automated testing for WebSockets can be done with httpx and pytest‑asyncio. Below is a minimal test that verifies the server pushes a well‑formed JSON payload after the generator starts.

# tests/test_websocket.py
import pytest, json, asyncio
from httpx import AsyncClient
from app.main import app

@pytest.mark.asyncio
async def test_metrics_stream():
    async with AsyncClient(app=app, base_url="http://test") as client:
        async with client.websocket_connect("/ws/metrics") as ws:
            # Send a dummy ping to keep the connection alive
            await ws.send_text("ping")
            # Receive the first metric
            data = await ws.receive_text()
            payload = json.loads(data)
            assert "timestamp" in payload
            assert "cpu" in payload
            assert "memory" in payload
            assert 0 <= payload["cpu"] <= 100

Running pytest -q should pass quickly, confirming that the WebSocket pipeline works end‑to‑end.

Deploying to Production

When you’re ready to ship, containerize the app. The Dockerfile below uses a multi‑stage build to keep the final image lightweight.

# Dockerfile
FROM python:3.12-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

FROM python:3.12-slim
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY app/ ./app/
COPY static/ ./static/
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Push the image to your registry, then spin it up behind an Nginx reverse proxy that terminates TLS and forwards /ws/ upgrades to the FastAPI container.

Monitoring & Observability

Even the most robust real‑time system benefits from observability. Instrument the FastAPI app with Prometheus metrics and expose a /metrics endpoint using prometheus‑fastapi‑instrumentator. Track connection counts, message rates, and error rates to spot bottlene

Share this article