TL;DR — Kafka Streams lets you compose real‑time pipelines directly in Java. Start with the low‑level DSL to prototype, then evolve the code into modular, fault‑tolerant topologies that scale on Kubernetes or bare metal. This post shows the step‑by‑step architectural patterns, code snippets, and operational tips you need to ship production‑ready stream processing.
Kafka Streams has become the go‑to library for teams that need low latency, exactly‑once semantics, and tight integration with Apache Kafka. Yet many engineers stop at the first “Hello World” example and never confront the engineering trade‑offs that arise when a prototype becomes a critical data‑pipeline. In this article we walk through the full lifecycle: from writing a simple DSL topology, to modularizing it with the Processor API, to wiring together reusable sub‑topologies, handling state, scaling, and observability in a production environment.
Understanding the Kafka Streams DSL
The DSL is intentionally concise; a few lines can express joins, aggregations, and windowed computations. Below is a minimal example that reads a topic of click events, groups them by user, and emits a running count.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "click-counter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> clicks = builder.stream("clicks");
KTable<String, Long> clickCounts = clicks
.groupBy((key, value) -> extractUserId(value))
.count(Materialized.as("user-click-counts"));
clickCounts.toStream().to("user-click-counts", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
extractUserIdis a tiny helper that parses the incoming JSON payload.- The
Materialized.ascall creates a persistent state store that backs the count.
When the DSL is enough
- Rapid prototyping – you can iterate on business logic without worrying about threading or state management.
- Stateless transformations – map, filter, flatMap work directly on the stream.
- Simple aggregations – count, sum, reduce on a single key.
However, the DSL hides the underlying processor graph, which becomes a problem when you need:
- Cross‑topic joins with custom partitioning
- Fine‑grained control over state store lifecycles
- Side‑effects like external HTTP calls or database writes
Moving Toward the Processor API
The Processor API gives you access to the low‑level ProcessorContext, enabling custom state stores, punctuations, and non‑Kafka side‑effects. You can still embed a DSL sub‑topology inside a processor, which is a common pattern for “hybrid” pipelines.
public class EnrichProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> ctx;
private KeyValueStore<String, String> cacheStore;
@Override
public void init(ProcessorContext<String, String> context) {
this.ctx = context;
this.cacheStore = context.getStateStore("enrich-cache");
// Schedule a punctuator to clean up stale cache entries every 5 minutes
this.ctx.schedule(Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
// pseudo‑code: iterate and delete old entries
});
}
@Override
public void process(Record<String, String> record) {
String enriched = enrich(record.value());
ctx.forward(record.withValue(enriched));
}
private String enrich(String value) {
// Example: call an external service, fallback to cacheStore
String cached = cacheStore.get(value);
if (cached != null) return cached;
// real HTTP call (omitted for brevity)
String result = HttpClient.get("https://api.example.com/lookup?key=" + value);
cacheStore.put(value, result);
return result;
}
@Override
public void close() {}
}
Hooking the processor into a topology
StreamsBuilder builder = new StreamsBuilder();
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("enrich-cache"),
Serdes.String(),
Serdes.String()
)
);
KStream<String, String> raw = builder.stream("raw-events");
raw.process(() -> new EnrichProcessor(), "enrich-cache")
.to("enriched-events", Produced.with(Serdes.String(), Serdes.String()));
- The processor runs after the
rawsource node, enriches each record, and forwards it downstream. - The state store is declared once and attached to the processor via its name.
Architecture Patterns for Production‑Ready Pipelines
1. Modular Sub‑Topologies (a.k.a. “Topology as a Library”)
Large organizations often treat a topology as a reusable library. A microservice may expose a KafkaStreams instance that other services can embed via a TopologyBuilder interface.
public interface TopologyBuilder {
Topology build(Properties appConfig);
}
A concrete implementation might encapsulate a common “event‑sourcing” pattern:
public class EventSourcingTopology implements TopologyBuilder {
@Override
public Topology build(Properties props) {
StreamsBuilder sb = new StreamsBuilder();
// 1️⃣ Ingest raw domain events
KStream<String, Event> events = sb.stream("domain-events", Consumed.with(Serdes.String(), new JsonSerde<>(Event.class)));
// 2️⃣ Materialize per‑entity state
KTable<String, EntityState> state = events
.groupByKey()
.aggregate(
EntityState::new,
(key, event, agg) -> agg.apply(event),
Materialized.<String, EntityState, KeyValueStore<Bytes, byte[]>>as("entity-state-store")
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde<>(EntityState.class))
);
// 3️⃣ Emit snapshots for downstream consumers
state.toStream().to("entity-snapshots", Produced.with(Serdes.String(), new JsonSerde<>(EntityState.class)));
return sb.build();
}
}
- Benefit – Teams can version and test a sub‑topology independently, then compose them at runtime.
- Production tip – Keep each sub‑topology under 1 GB of state to avoid large restore times after a crash (see the “State Store Sizing” section below).
2. Branch‑And‑Merge Pattern
When you need to apply distinct processing branches (e.g., fraud detection vs. analytics) on the same input, use branch() followed by merge().
KStream<String, Transaction> tx = builder.stream("transactions", Consumed.with(Serdes.String(), new JsonSerde<>(Transaction.class)));
KStream<String, Transaction>[] branches = tx.branch(
(key, value) -> value.getAmount() > 10_000, // high‑value
(key, value) -> true // all others
);
KStream<String, Transaction> highValue = branches[0];
KStream<String, Transaction> regular = branches[1];
// High‑value branch goes through a custom processor that calls a risk engine
highValue.process(() -> new RiskProcessor(), "risk-store")
.to("high‑value-alerts");
// Regular branch continues to aggregation
KTable<String, Double> dailyTotals = regular
.groupBy((k, v) -> v.getAccountId())
.aggregate(
() -> 0.0,
(k, v, agg) -> agg + v.getAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
dailyTotals.toStream().to("daily-totals");
- Why it matters – Keeps latency low for the majority of traffic while allocating extra resources to the expensive branch.
3. Exactly‑Once Semantics (EOS) End‑to‑End
Kafka Streams can achieve EOS with the processing.guarantee config. In production you must also tune the transaction timeout and commit interval.
processing.guarantee=exactly_once_v2
transaction.timeout.ms=600000
commit.interval.ms=1000
- Real‑world note – In a 2023 Confluent case study, enabling EOS reduced duplicate alerts by 99.8% while adding only ~15 ms latency per record (Confluent blog).
Scaling, Fault Tolerance, and State Management
Horizontal Scaling with Partition‑Aware Design
Each instance of a KafkaStreams application consumes a subset of partitions. To scale horizontally:
- Align the number of stream threads with the number of CPU cores (
num.stream.threads). - Ensure the input topics have at least as many partitions as the maximum number of instances you plan to run.
- Avoid “single‑partition bottlenecks” by designing keys that distribute evenly. For example, use a hash of
userIdrather than the raw ID if IDs are sequential.
num.stream.threads=4
State Store Replication and Standby Replicas
Kafka Streams offers standby replicas that keep a copy of each state store on a different host. This reduces restore time after a failure.
num.standby.replicas=2
- Rule of thumb – For stores larger than 500 MB, allocate at least two standby replicas. The restore time drops from minutes to seconds, as demonstrated in the “Kafka Streams at Uber” engineering post (Uber Engineering).
RocksDB Tuning
The default embedded store is RocksDB. Production tuning points:
| Parameter | Recommended Value | Reason |
|---|---|---|
rocksdb.block.cache.size | 256 MB (per instance) | Keeps hot keys in memory |
rocksdb.compaction.style | universal | Reduces write amplification for write‑heavy workloads |
rocksdb.max.background.jobs | 4 | Leverages multiple CPU cores for compaction |
You can set these via RocksDBConfigSetter:
public class CustomRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(final String storeName,
final Options options,
final Map<String, Object> configs) {
options.setWriteBufferSize(64 * 1024 * 1024);
options.setMaxBackgroundJobs(4);
options.setCompactionStyle(CompactionStyle.UNIVERSAL);
}
}
Monitoring and Observability
Kafka Streams exposes JMX metrics and integrates with Prometheus via the kafka-streams-prometheus exporter.
- Key metrics to watch
process-milliseconds-total– processing latency per task.records-consumed-totalvs.records-produced-total– ensures no data loss.state-store-size-bytes– helps detect state bloat.
Example Prometheus scrape config:
scrape_configs:
- job_name: 'kafka_streams'
static_configs:
- targets: ['streams-app:8000']
Add custom logging in processors for business‑level tracing:
ctx.log().info("Enriched record key={} latency={}ms", record.key(), System.currentTimeMillis() - start);
Testing, Deployment, and CI/CD Integration
Unit Testing with TopologyTestDriver
The TopologyTestDriver lets you execute a topology against in‑memory topics.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Topology topology = new EventSourcingTopology().build(props);
try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, props)) {
TestInputTopic<String, Event> input = testDriver.createInputTopic(
"domain-events", Serdes.String().serializer(), new JsonSerializer<>(Event.class));
TestOutputTopic<String, EntityState> output = testDriver.createOutputTopic(
"entity-snapshots", Serdes.String().deserializer(), new JsonDeserializer<>(EntityState.class));
input.pipeInput("user-1", new Event("CREATE", ...));
EntityState result = output.readValue();
assertEquals("expected-state", result.getStatus());
}
Integration Tests on Docker Compose
Spin up a Kafka cluster with docker-compose, run the application, and assert end‑to‑end behavior. Use the confluentinc/cp-kafka image for a quick setup.
docker-compose up -d kafka zookeeper
./gradlew integrationTest
CI/CD Pipelines
- Build –
./gradlew clean build(includes unit tests). - Dockerize –
docker build -t myorg/kafka-streams-app:$(git rev-parse --short HEAD) . - Deploy – Use Helm chart with
replicaCountset to the desired parallelism. Example Helm values:
replicaCount: 3
streams:
applicationId: "click-counter"
bootstrapServers: "kafka:9092"
processingGuarantee: "exactly_once_v2"
resources:
limits:
cpu: "2"
memory: "4Gi"
Blue‑Green rollout – Deploy a new version with a distinct application.id (e.g., click-counter-v2). Once the new instances are healthy, shift traffic by updating the source topic’s consumer group. This avoids downtime and lets you roll back instantly.
Key Takeaways
- Start with the Kafka Streams DSL for rapid iteration, then migrate critical parts to the Processor API for fine‑grained control.
- Treat topologies as reusable libraries; compose them with modular sub‑topologies to keep codebases maintainable.
- Leverage branch‑and‑merge patterns to isolate high‑cost processing while preserving low latency for the bulk of traffic.
- Enable exactly‑once semantics and configure standby replicas to meet strict reliability SLAs.
- Tune RocksDB, monitor JMX/Prometheus metrics, and instrument processors for business‑level observability.
- Use
TopologyTestDriverfor unit tests, Docker Compose for integration tests, and Helm‑based blue‑green deployments for safe production rollouts.
Further Reading
- Apache Kafka Streams Documentation – official guide covering DSL, Processor API, and deployment patterns.
- Confluent Blog: Exactly‑Once Stream Processing – deep dive into EOS guarantees and performance trade‑offs.
- Uber Engineering: Scaling Kafka Streams at Uber – real‑world case study on partition design, state store sizing, and monitoring.