TL;DR — Luigi lets you express complex data pipelines as a directed acyclic graph of Python tasks, and with a central scheduler, robust retries, and containerized workers you can run those pipelines at production scale across Kubernetes or a Celery cluster.
Data pipelines that survive nightly spikes, schema changes, and rolling deployments require more than a handful of ad‑hoc scripts. Luigi provides a declarative, Python‑first model for building dependency‑aware workflows, and its ecosystem supplies the plumbing needed for high‑availability orchestration. This post walks through Luigi’s core architecture, shows how to model dependencies reliably, and presents production‑ready patterns—central scheduling, parameterization, idempotent tasks, and observability—that turn a prototype into a resilient data platform.
Architecture Overview
Luigi’s architecture is intentionally minimal: a central scheduler, a worker process (or pool of workers), and task definitions written in pure Python. The scheduler stores a lightweight SQLite (or MySQL/PostgreSQL) database that records task status, parameters, and timestamps. Workers poll the scheduler for work, resolve the dependency graph, and execute tasks locally or in containers.
+-------------------+ +-------------------+ +-------------------+
| Scheduler (DB) |<------>| Worker Nodes |<------>| Task Code (Py) |
+-------------------+ +-------------------+ +-------------------+
Key properties:
- Deterministic task IDs – a task’s unique identifier is derived from its class name and parameter values, guaranteeing that the same logical step is never duplicated.
- Explicit DAG – each
requires()method returns other task instances, forming a directed acyclic graph that Luigi validates before execution. - Pluggable execution – workers can run locally, via Docker, or under a Celery executor, letting you scale horizontally without rewriting task logic.
Because the scheduler is the single source of truth, you can safely restart workers, upgrade code, or move the scheduler to a highly available database without breaking the pipeline.
Dependency Management in Practice
Defining Tasks and Parameters
A Luigi task inherits from luigi.Task and declares parameters that become part of its unique ID. Parameters can be simple strings, integers, or even dates, enabling fine‑grained partitioning.
import luigi
from datetime import datetime, timedelta
class ExtractFromKafka(luigi.Task):
topic = luigi.Parameter()
date = luigi.DateParameter(default=datetime.today())
def output(self):
return luigi.LocalTarget(f"data/raw/{self.topic}/{self.date}.json")
def run(self):
# Placeholder for real Kafka consumer logic
with self.output().open('w') as f:
f.write('{"sample":"data"}')
The output() method declares the target that downstream tasks will depend on. Luigi automatically checks whether the target exists before marking the task as complete.
Expressing Downstream Dependencies
Downstream tasks simply implement requires() to return upstream task instances. Luigi resolves the full dependency tree before any work begins.
class TransformJson(luigi.Task):
topic = luigi.Parameter()
date = luigi.DateParameter()
def requires(self):
return ExtractFromKafka(topic=self.topic, date=self.date)
def output(self):
return luigi.LocalTarget(f"data/processed/{self.topic}/{self.date}.parquet")
def run(self):
import pandas as pd, json, pyarrow.parquet as pq
with self.input().open('r') as src:
raw = json.load(src)
df = pd.json_normalize(raw)
pq.write_table(pd.DataFrame(df).to_parquet(), self.output().path)
Because requires() returns a task instance, Luigi can compute the full DAG even when parameters vary per run (e.g., daily partitions). The scheduler stores each concrete task ID, so it knows precisely which partitions are pending, succeeded, or failed.
Handling Optional or Dynamic Dependencies
In production you often need to branch based on data availability, such as “skip enrichment if the source file is empty.” Luigi supports conditional dependencies by returning an empty list or a luigi.Task only when a condition holds.
class EnrichIfNeeded(luigi.Task):
topic = luigi.Parameter()
date = luigi.DateParameter()
def requires(self):
# Only run enrichment if the raw file size > 0
raw_target = ExtractFromKafka(topic=self.topic, date=self.date).output()
if raw_target.exists() and raw_target.stat().st_size > 0:
return TransformJson(topic=self.topic, date=self.date)
return []
def output(self):
return luigi.LocalTarget(f"data/enriched/{self.topic}/{self.date}.csv")
def run(self):
# Enrichment logic here
pass
This pattern prevents unnecessary work and keeps the DAG compact, which is especially valuable when you have thousands of daily partitions.
Production‑Ready Orchestration Patterns
1. Central Scheduler with High‑Availability Backend
Running the scheduler against SQLite works for prototypes, but a production environment should use a robust relational database. PostgreSQL offers ACID guarantees and easy replication.
# config.cfg
[scheduler]
db_connection = postgresql://luigi_user:secret@db-host:5432/luigi
Deploy the scheduler as a systemd service or a Docker container that restarts automatically. Example systemd unit:
# /etc/systemd/system/luigi-scheduler.service
[Unit]
Description=Luigi Central Scheduler
After=network.target
[Service]
ExecStart=/usr/local/bin/luigid --config /etc/luigi/config.cfg
Restart=on-failure
User=luigi
[Install]
WantedBy=multi-user.target
With a replicated PostgreSQL backend, you can fail over the scheduler without losing task state.
2. Scaling Workers with Celery or Kubernetes
Luigi ships with a CeleryExecutor that lets workers pull tasks from a message broker (RabbitMQ or Redis). This decouples task dispatch from execution and enables elastic scaling.
# luigi.cfg
[core]
scheduler_host = scheduler-host
default-scheduler-port = 8082
default-scheduler-url = http://scheduler-host:8082
[celery]
broker = redis://redis-host:6379/0
backend = redis://redis-host:6379/1
Alternatively, the luigi‑kubernetes plugin runs each task in its own pod, giving you native cloud scaling and resource isolation.
apiVersion: batch/v1
kind: Job
metadata:
name: luigi-task-{{ .TaskID }}
spec:
template:
spec:
containers:
- name: luigi
image: myorg/luigi-task:latest
args: ["python", "my_task.py", "--topic", "{{ .Topic }}", "--date", "{{ .Date }}"]
restartPolicy: Never
Kubernetes handles pod retries, node failures, and auto‑scaling based on CPU/memory metrics, turning a monolithic pipeline into a cloud‑native micro‑batch system.
3. Idempotent Tasks and Atomic Outputs
Production pipelines must be able to restart without corrupting downstream data. Luigi’s output() target should be atomic—write to a temporary file and rename on success. The built‑in luigi.LocalTarget does this for you, but when writing to S3 or GCS you need to use the luigi.contrib.s3.S3Target with a temporary_path.
from luigi.contrib.s3 import S3Target
class LoadToBigQuery(luigi.Task):
topic = luigi.Parameter()
date = luigi.DateParameter()
def requires(self):
return EnrichIfNeeded(topic=self.topic, date=self.date)
def output(self):
return S3Target(
f"s3://data-lake/processed/{self.topic}/{self.date}.parquet",
format=luigi.format.Nop,
temporary_path=f"s3://data-lake/tmp/{self.topic}/{self.date}.parquet.tmp"
)
def run(self):
# Write to temporary_path then rename automatically
with self.input().open('rb') as src, self.output().open('wb') as dst:
dst.write(src.read())
If a task crashes after the temporary file is written, the rename never occurs, leaving downstream tasks untouched.
4. Automatic Retries and Failure Policies
Luigi lets you specify retry_count and retry_delay per task. For flaky external services (e.g., a transient Kafka broker outage) this avoids manual intervention.
class ExtractFromKafka(luigi.Task):
# ... parameters ...
retry_count = 3
retry_delay = 60 # seconds
# run() implementation as before
You can also implement a custom on_failure hook to push alerts to Slack or PagerDuty.
import luigi
import requests
class BaseAlertTask(luigi.Task):
def on_failure(self, exception):
webhook = "https://hooks.slack.com/services/T000/B000/XXXXXXXX"
payload = {
"text": f"Task {self.task_id} failed: {exception}"
}
requests.post(webhook, json=payload)
All production tasks inherit from BaseAlertTask, guaranteeing consistent alerting.
5. Monitoring, Metrics, and Auditing
The scheduler ships a built‑in web UI at http://scheduler-host:8082 that shows task status, runtime, and dependency graphs. For deeper observability, export Luigi metrics to Prometheus using the luigi.metrics extension.
# metrics.cfg
[prometheus]
host = prometheus-pushgateway
port = 9091
Instrument critical sections inside run() with custom counters.
from prometheus_client import Counter
records_processed = Counter('luigi_records_processed', 'Number of records processed per task')
class TransformJson(luigi.Task):
# ...
def run(self):
# ... load raw json ...
records_processed.inc(len(df))
# ... write parquet ...
Grafana dashboards built on these metrics give you real‑time insight into throughput, latency spikes, and failure rates—essential for SLO compliance.
Scaling Luigi for Thousands of Daily Partitions
When a pipeline processes dozens of topics across a 30‑day retention window, you can easily exceed 1,000 concurrent tasks. Here are three tactics that keep the scheduler responsive:
Batch Partition Scheduling – Instead of launching a separate Luigi run for each partition, use a wrapper task that expands into many child tasks. Luigi’s
requires()can return a list comprehension.class DailyBatch(luigi.WrapperTask): date = luigi.DateParameter() def requires(self): topics = ["clicks", "impressions", "orders"] return [EnrichIfNeeded(topic=t, date=self.date) for t in topics]Task Pruning with
complete()– Overridecomplete()to short‑circuit work when downstream data is already up‑to‑date, reducing scheduler load.class LoadToBigQuery(luigi.Task): # ... def complete(self): # Skip if target exists and is newer than source if self.output().exists(): src_mtime = self.input().fs.getmtime(self.input().path) dst_mtime = self.output().fs.getmtime(self.output().path) return dst_mtime >= src_mtime return FalseScheduler Sharding – For massive fleets, run multiple scheduler instances each responsible for a disjoint set of namespaces (e.g., per business unit). Use a naming convention in task IDs to route tasks to the appropriate scheduler.
config for finance namespace
[core] namespace = finance scheduler_host = finance-scheduler
These patterns let you keep the central scheduler’s SQLite or PostgreSQL tables from ballooning, while still preserving a global view of pipeline health.
## Real‑World Example: Kafka → Spark → BigQuery
Below is a concise end‑to‑end pipeline that ingests a Kafka topic, runs a Spark job for heavy transformation, and lands the result in BigQuery. The example demonstrates **cross‑technology orchestration** using Luigi as the glue.
```python
# kafka_to_bigquery.py
import luigi
from luigi.contrib.spark import SparkSubmitTask
from luigi.contrib.gcp import BigQueryTarget
from luigi.contrib.s3 import S3Target
class SparkTransform(luigi.Task):
topic = luigi.Parameter()
date = luigi.DateParameter()
def requires(self):
return ExtractFromKafka(topic=self.topic, date=self.date)
def output(self):
return S3Target(f"s3://tmp/{self.topic}/{self.date}.parquet")
def run(self):
# Write raw to a temporary location for Spark to read
with self.input().open('r') as src, self.output().open('wb') as dst:
dst.write(src.read())
class SubmitSparkJob(SparkSubmitTask):
topic = luigi.Parameter()
date = luigi.DateParameter()
spark_master = luigi.Parameter(default="spark://spark-master:7077")
def app(self):
return "s3://my-bucket/spark-jobs/transform.py"
def args(self):
return [
"--input", self.input().path,
"--output", self.output().path,
]
def input(self):
return SparkTransform(topic=self.topic, date=self.date).output()
def output(self):
return S3Target(f"s3://processed/{self.topic}/{self.date}.parquet")
class LoadToBigQuery(luigi.Task):
topic = luigi.Parameter()
date = luigi.DateParameter()
def requires(self):
return SubmitSparkJob(topic=self.topic, date=self.date)
def output(self):
table_id = f"myproject.dataset.{self.topic}_daily"
return BigQueryTarget(table_id, schema="auto")
def run(self):
# Use bq command line for simplicity
import subprocess, shlex
cmd = f"bq load --source_format=PARQUET {self.output().table_id} {self.input().path}"
subprocess.run(shlex.split(cmd), check=True)
Deploy the pipeline with a daily cron job:
luigi --module kafka_to_bigquery LoadToBigQuery --topic clicks --date 2026-05-20 \
--local-scheduler --workers 8
In production you would replace --local-scheduler with the central scheduler URL and run the command inside a Kubernetes CronJob, ensuring that each daily run is isolated and observable.
Key Takeaways
- Luigi’s declarative DAG guarantees that every task runs exactly once per parameter set, making it ideal for partitioned data pipelines.
- Central scheduler + durable DB provides a single source of truth; migrate from SQLite to PostgreSQL for HA.
- Scale workers via Celery, Docker, or Kubernetes to handle thousands of concurrent tasks without changing task code.
- Idempotent outputs and atomic writes prevent partial failures from corrupting downstream data.
- Built‑in retries, custom failure hooks, and Prometheus metrics give you robust error handling and observability.
- Production patterns—wrapper tasks,
complete()pruning, and scheduler sharding—keep the system performant as data volume grows.