TL;DR — A software factory stitches together autonomous agents, event streams, and orchestrators into a repeatable production pipeline. By grounding the design in Kafka, Airflow (or Temporal), and observability best‑practices, teams can ship AI‑augmented features at scale with predictable reliability.
Modern product teams are no longer just writing code; they are curating behaviors that run continuously—data ingestion, model inference, validation, and rollout. Treating these behaviors as agents that exchange messages through a structured workflow turns ad‑hoc scripts into a disciplined, version‑controlled factory. This guide walks through the architectural pillars, concrete patterns, and production‑grade tooling you need to turn that vision into reality.
Why a Software Factory Matters
- Predictable throughput – By defining explicit stages (ingest → process → validate → deploy), you can measure cycle time and spot bottlenecks.
- Safety through isolation – Each agent runs in its own container or sandbox, limiting failure propagation.
- Continuous improvement – Versioned agents and pipelines let you A/B test new logic without disrupting the whole system.
- Observability as a first‑class citizen – Structured logs, metrics, and traces become inherent to the workflow, not an afterthought.
Enterprises such as Netflix, Uber, and Shopify already run “software factories” for recommendation engines, fraud detection, and personalization. They rely on a handful of battle‑tested components that we’ll replicate below.
Core Concepts
Agents and Tasks
An agent is a self‑contained service that performs a specific task when triggered by an event. Think of it as a micro‑service with a narrow responsibility:
# agent_example.py
import os
import json
from kafka import KafkaConsumer, KafkaProducer
consumer = KafkaConsumer(
os.getenv("INPUT_TOPIC"),
bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP"),
group_id="agent-processor",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)
producer = KafkaProducer(
bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP"),
value_serializer=lambda m: json.dumps(m).encode("utf-8"),
)
def enrich_message(msg):
# Placeholder for real enrichment logic
msg["enriched"] = True
return msg
for record in consumer:
enriched = enrich_message(record.value)
producer.send(os.getenv("OUTPUT_TOPIC"), enriched)
The agent above reads from a Kafka topic, enriches each payload, and writes to another topic. It can be packaged as a Docker image and deployed via Kubernetes, giving us the isolation and scalability needed for production.
Structured Workflows
A workflow stitches agents together using an orchestrator that guarantees ordering, retries, and conditional branching. Two popular choices:
| Orchestrator | Strengths | Typical Use‑Case |
|---|---|---|
| Apache Airflow | DAG‑centric, rich UI, extensible operators | Batch‑oriented ETL, nightly model training |
| Temporal.io | Strong fault‑tolerance, code‑first workflows, versioning | Real‑time, event‑driven pipelines with complex state |
Both expose a programmatic API, so you can version your workflow definition alongside your agents.
Architecture of a Production Pipeline
Below is a reference architecture that scales from a single‑node prototype to a multi‑region production system.
+-------------------+ +--------------------+ +-------------------+
| Data Sources | ---> | Kafka (Ingress) | ---> | Agent Pool |
+-------------------+ +--------------------+ +-------------------+
|
v
+-------------------+
| Temporal / |
| Airflow DAG |
+-------------------+
|
v
+-------------------+
| Persistence |
| (Postgres, |
| BigQuery) |
+-------------------+
|
v
+-------------------+
| Monitoring & |
| Alerting (Prom- |
| etheus, Grafana)|
+-------------------+
Data Ingestion Layer
- Kafka acts as the durable event log. Configure topic replication factor ≥ 3 and enable log compaction for idempotent updates. See the official Kafka documentation for sizing guidelines.
# kafka-config.yaml
num.partitions: 12
replication.factor: 3
log.retention.hours: 168
log.cleanup.policy: compact
Orchestration Engine
Airflow DAG Example (Python)
# dags/software_factory.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-eng",
"depends_on_past": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="software_factory_pipeline",
start_date=datetime(2024, 1, 1),
schedule_interval="@hourly",
default_args=default_args,
catchup=False,
) as dag:
ingest = BashOperator(
task_id="ingest_events",
bash_command="python -m agents.ingest --topic raw_events",
)
enrich = BashOperator(
task_id="enrich_events",
bash_command="python -m agents.enrich --in-topic raw_events --out-topic enriched_events",
)
validate = BashOperator(
task_id="validate_events",
bash_command="python -m agents.validate --in-topic enriched_events",
)
publish = BashOperator(
task_id="publish_results",
bash_command="python -m agents.publish --in-topic validated_events",
)
ingest >> enrich >> validate >> publish
Airflow’s UI visualizes the DAG, surfaces task logs, and integrates with Slack for failure alerts. For real‑time use cases, Temporal’s workflow-as-code model eliminates the need for a scheduler.
Temporal Workflow Sketch (Go)
// workflow.go
package factory
import (
"go.temporal.io/sdk/workflow"
)
func SoftwareFactoryWorkflow(ctx workflow.Context, input Input) (Result, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute * 5,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second * 10,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
var ingestResult IngestResult
if err := workflow.ExecuteActivity(ctx, IngestActivity, input).Get(ctx, &ingestResult); err != nil {
return Result{}, err
}
var enrichResult EnrichResult
if err := workflow.ExecuteActivity(ctx, EnrichActivity, ingestResult).Get(ctx, &enrichResult); err != nil {
return Result{}, err
}
// Continue with validation, publishing, etc.
return Result{Status: "success"}, nil
}
Temporal guarantees exactly‑once execution even across process restarts, making it ideal for financial or compliance‑critical pipelines. See the official Temporal docs for deeper semantics.
Persistence & Query Layer
- PostgreSQL for transactional state (e.g., workflow checkpoints). Use logical replication to feed a data warehouse.
- BigQuery or Snowflake for analytical queries on enriched event streams.
Monitoring & Observability
| Concern | Tool | Example Metric |
|---|---|---|
| Logs | Loki + Grafana | agent_success_total{agent="enrich"} |
| Traces | OpenTelemetry + Jaeger | End‑to‑end latency per event |
| Metrics | Prometheus | kafka_consumer_lag{topic="raw_events"} |
| Alerts | Alertmanager | Trigger on kafka_consumer_lag > 1_000_000 |
Instrument each agent with a structured logger (json format) and expose a /metrics endpoint for Prometheus. The OpenTelemetry Python SDK makes this trivial.
Patterns in Production
- Idempotent Handlers – Design agents to safely reprocess the same message. Use deterministic keys (e.g.,
event_id) and upsert semantics. - Circuit Breaker per Agent – Prevent cascading failures when downstream services become unavailable. Hystrix‑style libraries exist for Go, Java, and Python.
- Blue‑Green Deployment of Agents – Run a new version alongside the old one, shift traffic via Kafka consumer group rebalancing, and roll back instantly if errors spike.
- Schema Evolution via Avro/Protobuf – Serialize messages with versioned schemas stored in a registry (e.g., Confluent Schema Registry) to avoid breaking downstream agents.
- Back‑Pressure Management – Leverage Kafka’s consumer lag metrics to throttle upstream producers, ensuring the system stays within its processing capacity.
Implementation Walkthrough: Kafka + Temporal + Kubernetes
Below is a concise step‑by‑step you can reproduce in a sandbox environment.
Provision Kafka (using Helm on a local kind cluster):
helm repo add bitnami https://charts.bitnami.com/bitnami helm install kafka bitnami/kafka --set replicaCount=3Create Topics:
kubectl exec -it $(kubectl get pod -l app.kubernetes.io/name=kafka -o jsonpath="{.items[0].metadata.name}") -- \ kafka-topics.sh --create --topic raw_events --partitions 12 --replication-factor 3Build Agent Docker Image (example for the enrichment agent):
# Dockerfile FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY agent_example.py . CMD ["python", "agent_example.py"]Build and push:
docker build -t ghcr.io/yourorg/enrich-agent:latest . docker push ghcr.io/yourorg/enrich-agent:latestDeploy to Kubernetes (using a Deployment and a ServiceAccount that has access to the Kafka secret):
# enrich-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: enrich-agent spec: replicas: 4 selector: matchLabels: app: enrich-agent template: metadata: labels: app: enrich-agent spec: containers: - name: enrich image: ghcr.io/yourorg/enrich-agent:latest env: - name: INPUT_TOPIC value: "raw_events" - name: OUTPUT_TOPIC value: "enriched_events" - name: KAFKA_BOOTSTRAP value: "kafka:9092"Deploy Temporal Server (official Helm chart):
helm repo add temporal https://charts.temporal.io helm install temporal temporal/temporalRegister the Workflow (using Go SDK or the Temporal UI). Once registered, trigger it via an HTTP endpoint or a Kafka consumer that enqueues a start signal.
Enable Observability – Install Prometheus Operator, Loki, and Grafana via the
kube-prometheus-stackHelm chart. Add scrape configs for the agent pods and Temporal metrics.Run a Smoke Test:
kafka-console-producer.sh --broker-list kafka:9092 --topic raw_events <<EOF {"event_id":"test-123","payload":"sample"} EOFVerify that the enriched event appears on
enriched_eventsand that the Temporal workflow completes successfully.
Key Takeaways
- Agents + Orchestrator = Reusable, versioned pipelines; treat each micro‑service as a plug‑in rather than a monolith.
- Kafka provides the immutable backbone; configure replication, compaction, and schema evolution to guarantee durability.
- Choose the right orchestrator: Airflow for batch‑oriented DAGs, Temporal for event‑driven, stateful workflows.
- Observability is non‑negotiable; instrument logs, metrics, and traces from day one to catch latency spikes or data loss early.
- Production patterns (idempotency, circuit breakers, blue‑green deployments) turn experimental code into reliable services.
- Infrastructure as code (Helm, Terraform, Dockerfiles) ensures that the entire factory can be reproduced across environments.