Partition backfill strategies
In this example, we'll explore three different strategies for backfilling partitioned assets. When you need to materialize multiple partitions (for initial setup or reprocessing), you can choose between Dagster's default one-run-per-partition approach, a batched approach, or a single-run approach using BackfillPolicy. Each strategy has distinct trade-offs in terms of overhead, fault isolation, and resource utilization.
| Factor | One per partition | Batched | Single run |
|---|---|---|---|
| Run overhead | High (N runs) | Medium (N/batch runs) | Low (1 run) |
| Fault isolation | Best | Moderate | None |
| Retry cost | 1 partition | 1 batch | All partitions |
| Observability | Per partition | Per batch | Aggregate only |
Problem: Backfilling 100 days of historical data
Imagine you need to backfill 100 days of historical event data. Each day's data needs to be processed and stored. Without optimization, this could mean launching 100 separate runs, each with its own overhead. But processing everything in one run means a single failure requires reprocessing all 100 days.
The key question is: How should you batch your partitions to balance overhead, fault isolation, and performance?
| Solution | Best for |
|---|---|
| One run per partition (default) | Unreliable data sources, API rate limits, fine-grained retry capability, per-partition observability |
| Batched runs | Reducing overhead while maintaining fault isolation, short processing times, initial backfills |
| Single run | Spark/Snowflake/Databricks, range-based queries, minimizing Dagster Cloud credits |
Solution 1: One run per partition (default)
By default, Dagster launches one run per partition. This provides maximum observability and fault isolation—if one partition fails, others continue independently, and partitions are individually retried. For 100 partitions, this creates 100 separate runs, each with its own startup overhead. This approach is best when your data source is unreliable (API rate limits, transient failures), you need fine-grained retry capability for individual partitions, or per-partition observability is critical.
import dagster as dg
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
@dg.asset(partitions_def=daily_partitions)
def daily_events(context: dg.AssetExecutionContext):
"""Process events for a single day. Each partition runs separately."""
partition_date = context.partition_key
context.log.info(f"Processing events for {partition_date}")
# Process data for this single partition
events = fetch_events_for_date(partition_date)
processed = transform_events(events)
context.log.info(f"Processed {len(processed)} events for {partition_date}")
return processed
def fetch_events_for_date(date: str) -> list:
# Simulate fetching events for a specific date
return [{"date": date, "event_id": i} for i in range(100)]
def transform_events(events: list) -> list:
# Simulate transformation
return [{"processed": True, **e} for e in events]
Solution 2: Batched runs
With BackfillPolicy.multi_run, Dagster groups partitions into batches. For example, with max_partitions_per_run=10, 100 partitions become 10 runs of 10 partitions each. This reduces overhead by 90% while maintaining moderate fault isolation—if one partition fails, only its batch of 10 needs to retry. This approach works well when you want to reduce overhead while maintaining some fault isolation, processing time per partition is short (seconds to a few minutes), or you're doing an initial backfill of many partitions.
import dagster as dg
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
@dg.asset(
partitions_def=daily_partitions,
backfill_policy=dg.BackfillPolicy.multi_run(max_partitions_per_run=10),
)
def daily_events(context: dg.AssetExecutionContext):
"""Process events for a batch of days in each run."""
partition_keys = context.partition_keys
context.log.info(f"Processing {len(partition_keys)} partitions in this run")
# Process data for all partitions in this batch
all_events = []
for partition_key in partition_keys:
events = fetch_events_for_date(partition_key)
all_events.extend(events)
processed = transform_events(all_events)
context.log.info(f"Processed {len(processed)} total events across {len(partition_keys)} days")
return processed
def fetch_events_for_date(date: str) -> list:
# Simulate fetching events for a specific date
return [{"date": date, "event_id": i} for i in range(100)]
def transform_events(events: list) -> list:
# Simulate transformation
return [{"processed": True, **e} for e in events]
When using BackfillPolicy.multi_run, consider:
- Overhead reduction: Batch size of 10 reduces runs by 90%
- Failure blast radius: If one partition fails, the entire batch retries
- Memory usage: More partitions per run may require more memory
- Processing model: Sequential processing means larger batches take longer
Recommended starting points:
| Partition processing time | Suggested batch size |
|---|---|
| Under 1 minute | 20-50 |
| 1-5 minutes | 10-20 |
| 5-15 minutes | 5-10 |
| Over 15 minutes | 1-5 or single run |
Adjust based on observed failure rates and infrastructure constraints. For more information on parallelization within batched runs, see Parallelization within batched runs.
Solution 3: Single run
With BackfillPolicy.single_run, Dagster processes all selected partitions in one run, eliminating per-run overhead entirely. For 100 partitions, this creates just 1 run. However, a failure requires retrying all partitions together. This approach is ideal when you're using a parallel-processing engine (Spark, Snowflake, Databricks), your queries naturally operate on date ranges, or you want to minimize Dagster Cloud credit consumption.
import dagster as dg
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
@dg.asset(
partitions_def=daily_partitions,
backfill_policy=dg.BackfillPolicy.single_run(),
)
def daily_events(context: dg.AssetExecutionContext):
"""Process events for multiple days in a single run."""
start_datetime, end_datetime = context.partition_time_window
context.log.info(f"Processing events from {start_datetime} to {end_datetime}")
# Process data for the entire partition range at once
events = fetch_events_for_range(start_datetime, end_datetime)
processed = transform_events(events)
context.log.info(f"Processed {len(processed)} events in single run")
return processed
def fetch_events_for_range(start, end) -> list:
# Simulate fetching events for a date range (e.g., SQL WHERE clause)
return [{"start": str(start), "end": str(end), "event_id": i} for i in range(1000)]
def transform_events(events: list) -> list:
# Simulate transformation
return [{"processed": True, **e} for e in events]
| Scenario | Recommended strategy |
|---|---|
| API with rate limits or transient failures | One per partition |
| Short processing time, reliable source | Batched (10-50 per run) |
| Spark/Snowflake with range queries | Single run |
| Cost optimization in Dagster Cloud | Single run or large batches |
| Initial backfill of 1000+ partitions | Batched (50-100 per run) |
Parallelization within batched runs
When using BackfillPolicy.multi_run, you get multiple partitions in a single run. Here are different ways to parallelize processing within that run.
| Strategy | Best for | Max concurrency | Overhead | Complexity |
|---|---|---|---|---|
| Batch query | SQL databases | N/A (single query) | Very low | Very low |
| Thread pool | I/O-bound tasks | 10-100 threads | Low | Low |
| Process pool | CPU-bound tasks | Number of CPU cores | Medium | Low |
Strategy 1: Batch query (fastest for databases)
Process all partitions in a single database query:
import dagster as dg
customer_partitions = dg.StaticPartitionsDefinition(
["customer_a", "customer_b", "customer_c", "customer_d", "customer_e"]
)
@dg.asset(
partitions_def=customer_partitions,
backfill_policy=dg.BackfillPolicy.multi_run(max_partitions_per_run=10),
)
def process_customers_batch(context: dg.AssetExecutionContext) -> None:
"""Process all partitions in a single database query."""
customer_ids = context.partition_keys
# Single query for all customers - most efficient for databases
query = "SELECT * FROM customers WHERE customer_id IN %s"
results = execute_query(query, customer_ids)
context.log.info(f"Processed {len(results)} customers in single query")
def execute_query(query: str, params: list) -> list:
# Simulated database query
return [{"customer_id": p, "data": "..."} for p in params]
Best for: SQL databases, REST APIs with batch endpoints
Strategy 2: Thread pool (I/O-bound operations)
Use threads for parallel I/O operations:
from concurrent.futures import ThreadPoolExecutor
import dagster as dg
customer_partitions = dg.StaticPartitionsDefinition(
["customer_a", "customer_b", "customer_c", "customer_d", "customer_e"]
)
@dg.asset(
partitions_def=customer_partitions,
backfill_policy=dg.BackfillPolicy.multi_run(max_partitions_per_run=10),
)
def fetch_customer_data(context: dg.AssetExecutionContext) -> None:
"""Use threads for parallel I/O operations."""
customer_ids = context.partition_keys
def fetch_one(customer_id):
# Simulated API call
return {"customer_id": customer_id, "data": f"data for {customer_id}"}
# Process up to 5 customers concurrently
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(fetch_one, customer_ids))
context.log.info(f"Fetched {len(results)} customers with thread pool")
Best for: HTTP requests, file I/O, database queries
Parallelism: Limited by max_workers (5 concurrent in this example)
Strategy 3: Process pool (CPU-bound operations)
Use processes for parallel CPU-intensive work:
from concurrent.futures import ProcessPoolExecutor
import dagster as dg
customer_partitions = dg.StaticPartitionsDefinition(
["customer_a", "customer_b", "customer_c", "customer_d", "customer_e"]
)
def analyze_one(customer_id: str) -> dict:
"""CPU-intensive analysis - must be defined at module level for ProcessPool."""
# Simulated CPU-intensive work
result = sum(i * i for i in range(100000))
return {"customer_id": customer_id, "analysis": result}
@dg.asset(
partitions_def=customer_partitions,
backfill_policy=dg.BackfillPolicy.multi_run(max_partitions_per_run=10),
)
def analyze_customer_data(context: dg.AssetExecutionContext) -> None:
"""Use processes for parallel CPU-intensive work."""
customer_ids = context.partition_keys
# Use multiple CPU cores
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(analyze_one, customer_ids))
context.log.info(f"Analyzed {len(results)} customers with process pool")
Best for: CPU-intensive computations, data transformations
Parallelism: Limited by CPU cores