Apache Kafka 4.0: Event Streaming Guide
Apache Kafka 4.0 marks a significant evolution in the world of event streaming, blending high‑throughput messaging with powerful stream processing capabilities. Whether you’re building a real‑time analytics pipeline, a microservices backbone, or a data lake ingest layer, Kafka 4.0 offers the reliability and flexibility you need. In this guide we’ll walk through the core concepts, set up a simple producer‑consumer pair in Python, explore stream processing with Faust, and discuss real‑world patterns that can elevate your architecture.
Getting Started with Kafka 4.0
Before you dive into code, make sure you have a Kafka 4.0 cluster up and running. The easiest way is to use the official Docker image:
docker run -d \
--name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
confluentinc/cp-kafka:4.0.0
Once the broker is healthy, you can interact with it using the kafka-topics CLI to create a topic for our examples.
docker exec -it kafka kafka-topics \
--create \
--topic user-events \
--partitions 3 \
--replication-factor 1 \
--bootstrap-server localhost:9092
Why Topics, Partitions, and Replication Matter
- Topics are logical channels that group related events (e.g., user-events).
- Partitions enable parallelism; each partition is an ordered log that can be consumed independently.
- Replication guarantees durability; a replicated partition survives broker failures.
Understanding these fundamentals will help you design a system that scales horizontally while staying resilient.
Building a Simple Producer in Python
Kafka’s Python client, confluent-kafka, is a thin wrapper around the high‑performance librdkafka library. Install it with pip and start producing events.
pip install confluent-kafka
Below is a minimal producer that sends JSON‑encoded user activity events to the user-events topic.
import json
import time
from confluent_kafka import Producer
conf = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'event-producer'
}
producer = Producer(conf)
def delivery_report(err, msg):
if err is not None:
print(f'Delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
def produce_user_event(user_id, action):
event = {
'user_id': user_id,
'action': action,
'timestamp': int(time.time() * 1000)
}
producer.produce(
topic='user-events',
key=str(user_id),
value=json.dumps(event),
callback=delivery_report
)
producer.poll(0)
if __name__ == '__main__':
actions = ['login', 'view_product', 'add_to_cart', 'checkout']
for i in range(1, 11):
produce_user_event(user_id=i, action=actions[i % len(actions)])
time.sleep(0.5)
producer.flush()
This script demonstrates three key ideas: using a key for message ordering within a partition, serializing payloads as JSON, and handling asynchronous delivery callbacks.
Pro tip: For high‑volume workloads, batch messages locally and call producer.flush() only once per batch. This reduces network round‑trips and boosts throughput.
Consuming Events with a Python Consumer
Now let’s read the events back. The consumer will join a consumer group, which enables load‑balanced processing across multiple instances.
from confluent_kafka import Consumer, KafkaException
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'event-consumers',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe(['user-events'])
def process_message(msg):
event = json.loads(msg.value().decode('utf-8'))
print(f"User {event['user_id']} performed {event['action']} at {event['timestamp']}")
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
process_message(msg)
except KeyboardInterrupt:
pass
finally:
consumer.close()
The consumer uses auto.offset.reset=earliest so it reads from the beginning of the log on first start. In production you’ll likely rely on committed offsets to resume exactly where you left off.
Handling Rebalances Gracefully
- Implement
on_assignandon_revokecallbacks to manage state when partitions shift between instances. - Persist any in‑flight processing state (e.g., database transactions) before a rebalance occurs.
- Use
enable.auto.commit=Falseif you need exactly‑once processing semantics.
Pro tip: Combine the consumer with a local cache (e.g., Redis) to de‑duplicate events that might be replayed after a rebalance.
Stream Processing with Faust (Python)
Kafka 4.0 introduces native support for exactly‑once semantics (EOS) in its Streams API, but Python developers often prefer Faust, an open‑source stream processing library that mimics the Java Streams DSL.
First, install Faust:
pip install faust
Below is a Faust application that enriches incoming user-events with a static user profile table and writes the result to a new topic enriched-events.
import faust
import json
app = faust.App(
'enrichment-app',
broker='kafka://localhost:9092',
store='memory://',
topic_partitions=3
)
# Define the input and output topics
raw_events = app.topic('user-events', value_type=bytes)
enriched_events = app.topic('enriched-events', partitions=3)
# In‑memory table of user profiles (could be backed by RocksDB or Redis)
user_profiles = {
1: {'name': 'Alice', 'tier': 'gold'},
2: {'name': 'Bob', 'tier': 'silver'},
3: {'name': 'Charlie', 'tier': 'bronze'}
}
class EnrichedEvent(faust.Record):
user_id: int
name: str
tier: str
action: str
timestamp: int
@app.agent(raw_events)
async def enrich(events):
async for raw in events:
payload = json.loads(raw.decode('utf-8'))
profile = user_profiles.get(payload['user_id'], {'name': 'unknown', 'tier': 'none'})
enriched = EnrichedEvent(
user_id=payload['user_id'],
name=profile['name'],
tier=profile['tier'],
action=payload['action'],
timestamp=payload['timestamp']
)
await enriched_events.send(value=enriched)
if __name__ == '__main__':
app.main()
Faust automatically handles partition assignment, state management, and fault tolerance. The EnrichedEvent record is serialized using Faust’s built‑in schema, which reduces boilerplate compared to manual JSON handling.
Pro tip: For production workloads, replace the in‑memoryuser_profilestable with a FaustTablebacked by RocksDB. This gives you on‑disk durability and fast look‑ups across restarts.
Real‑World Use Cases
1. Clickstream Analytics
Online retailers capture every click, scroll, and purchase as an event. By funneling these events into Kafka, they can power dashboards that update in seconds, detect anomalies (e.g., sudden traffic spikes), and feed machine‑learning models that predict conversion likelihood.
- Producer: Front‑end JavaScript libraries send page‑view events to a
clickstreamtopic. - Processor: A Kafka Streams job aggregates events per user session, calculates dwell time, and writes aggregates to a
session‑metricstopic. - Consumer: A BI tool (e.g., Superset) reads
session‑metricsand visualizes real‑time KPIs.
2. Microservices Event Bus
Instead of synchronous HTTP calls, microservices can emit domain events (e.g., OrderCreated, PaymentProcessed) to Kafka. Other services subscribe to the relevant topics, decoupling the producer from the consumer and enabling independent scaling.
- Order Service publishes
order-eventsafter persisting to its database. - Inventory Service consumes
order-events, reserves stock, and emitsinventory-updated. - Notification Service listens to both streams and sends email/SMS alerts.
This pattern also simplifies rollback scenarios: replaying the order-events topic can rebuild state in a new environment.
3. Event‑Sourced CQRS
Command Query Responsibility Segregation (CQRS) paired with event sourcing stores every state change as an immutable event. Kafka acts as the event log, while separate read‑model processors build materialized views for fast queries.
- Write model: Commands are validated and turned into events (e.g.,
AccountCredited). - Event store: Kafka topics hold the chronological sequence of events.
- Read model: A KSQLDB or Kafka Streams job aggregates events into a relational view used by APIs.
Kafka 4.0’s EOS guarantees that each event is applied exactly once, eliminating duplicate balances that plagued earlier implementations.
Advanced Kafka 4.0 Features
Exactly‑Once Semantics (EOS)
EOS ensures that a record is neither lost nor duplicated, even in the presence of failures. It works by combining idempotent producers, transactional writes, and consumer offset commits within the same transaction.
conf = {
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'my-transactional-producer',
'enable.idempotence': True
}
producer = Producer(conf)
producer.init_transactions()
def produce_transactional(messages):
producer.begin_transaction()
for key, value in messages:
producer.produce('transactions', key=key, value=value)
producer.send_offsets_to_transaction(
consumer.position(consumer.assignment()),
consumer.group_id()
)
producer.commit_transaction()
When the consumer reads from a transactional topic, it can safely commit offsets only after the transaction is committed, guaranteeing exactly‑once processing.
KIP‑500: Removing Zookeeper
Kafka 4.0 introduces KIP‑500, which replaces Zookeeper with a built‑in consensus layer (Raft). This simplifies cluster management, reduces operational overhead, and improves scalability.
- Dynamic broker membership without manual Zookeeper configuration.
- Faster leader elections and smoother rolling upgrades.
- Unified security model across the control plane.
If you’re deploying a fresh cluster, consider using the KRaft mode (Kafka Raft) to take advantage of these benefits.
Tiered Storage
Tiered storage lets you offload older segment files to inexpensive object storage (e.g., S3) while keeping recent data on local SSDs. This dramatically reduces hardware costs for long‑term retention.
# Example broker config for tiered storage
log.storage.type=remote
log.remote.storage.enable=true
log.remote.storage.class=org.apache.kafka.storage.remote.S3RemoteStorage
log.remote.storage.s3.bucket=my-kafka-archive
With tiered storage, you can retain data for months or years without exploding your disk footprint.
Monitoring, Security, and Ops
Observability
Kafka 4.0 ships with native Prometheus metrics and JMX exporters. Export key metrics such as bytes_in_per_sec, under_replicated_partitions, and consumer lag to detect bottlenecks early.
# prometheus.yml snippet
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['localhost:9090']
Grafana dashboards built on the official Kafka metrics pack provide instant visual insight into throughput, latency, and error rates.
Security Best Practices
- Enable TLS encryption for both inter‑broker and client‑to‑broker traffic.
- Use SASL/SCRAM for authentication; store credentials in a secret manager.
- Define ACLs per topic to enforce least‑privilege access.
Kafka 4.0 also supports role‑based access control (RBAC) via the Confluent Platform, making policy management more granular.
Pro tip: Rotate SASL credentials regularly and automate the rollout using Kubernetes Secrets or HashiCorp Vault.
Rolling Upgrades
KIP‑500’s Raft controller enables seamless rolling upgrades. Follow these steps:
- Upgrade one broker at a time; verify that the controller remains healthy.
- Monitor
controller.activemetric to ensure a new controller is elected if needed. - After all brokers are upgraded, restart the controller to apply new configuration.
Because metadata is now stored in the Raft log, there’s no dependency on a separate Zookeeper quorum, reducing upgrade complexity.
Testing Your Kafka Pipelines
Automated testing is essential for reliable streaming applications. Use testcontainers to spin up an isolated Kafka broker for integration tests.
from testcontainers.kafka import KafkaContainer
import pytest
@pytest.fixture(scope='module')
def kafka():
with KafkaContainer() as kafka:
yield kafka.get_bootstrap_server()
def test_produce_consume(kafka):
# Set up producer and consumer using the temporary broker
# Verify that a sent message is received exactly once
pass
For stream processing logic, Faust provides a app.test_context() that lets you feed synthetic events and assert output without a live cluster.
Best Practices Checklist
- Use keys for ordering and partition affinity.
- Prefer idempotent producers and enable EOS for critical pipelines.
- Leverage KRaft (KIP‑500) for new deployments.
- Implement proper consumer rebalance callbacks.
- Externalize configuration (e.g., via environment variables or ConfigMaps).
- Instrument with Prometheus and set alerts on lag and under‑replicated