Checkpoints
Checkpoints allow DataChain to automatically skip re-creating datasets that were successfully saved in previous script runs. When a script fails or is interrupted, you can re-run it and DataChain will resume from where it left off, reusing datasets that were already created.
Checkpoints are available for both local script runs and Studio executions.
How Checkpoints Work
Local Script Runs
When you run a Python script locally (e.g., python my_script.py), DataChain automatically:
- Creates a job for the script execution, using the script's absolute path as the job name
- Tracks parent jobs by finding the last job with the same script name
- Calculates hashes for each dataset save operation based on the DataChain operations chain
- Creates checkpoints after each successful
.save()call, storing the hash - Checks for existing checkpoints on subsequent runs - if a matching checkpoint exists in the parent job, DataChain skips the save and reuses the existing dataset
This means that if your script creates multiple datasets and fails partway through, the next run will skip recreating the datasets that were already successfully saved.
Studio Runs
When running jobs on Studio, the checkpoint workflow is managed through the UI:
- Job execution is triggered using the Run button in the Studio interface
- Checkpoint control is explicit - you choose between:
- Run from scratch: Ignores any existing checkpoints and recreates all datasets
- Continue from last checkpoint: Resumes from the last successful checkpoint, skipping already-completed stages
- Parent-child job linking is handled automatically by the system - no need for script path matching or job name conventions
- Checkpoint behavior during execution is the same as local runs: datasets are saved at each
.save()call and can be reused on retry
Example
Consider this script that processes data in multiple stages:
import datachain as dc
# Stage 1: Load and filter data
filtered = (
dc.read_csv("s3://mybucket/data.csv")
.filter(dc.C("score") > 0.5)
.save("filtered_data")
)
# Stage 2: Transform data
transformed = (
filtered
.map(value=lambda x: x * 2, output=float)
.save("transformed_data")
)
# Stage 3: Aggregate results
result = (
transformed
.agg(
total=lambda values: sum(values),
partition_by="category",
)
.save("final_results")
)
First run: The script executes all three stages and creates three datasets: filtered_data, transformed_data, and final_results. If the script fails during Stage 3, only filtered_data and transformed_data are saved.
Second run: DataChain detects that filtered_data and transformed_data were already created in the parent job with matching hashes. It skips recreating them and proceeds directly to Stage 3, creating only final_results.
When Checkpoints Are Used
Checkpoints are automatically used when:
- Running a Python script locally (e.g.,
python my_script.py) - The script has been run before
- A dataset with the same name is being saved
- The chain hash matches a checkpoint from the parent job
Checkpoints are not used when:
- Running code interactively (Python REPL, Jupyter notebooks)
- Running code as a module (e.g.,
python -m mymodule) - The
DATACHAIN_CHECKPOINTS_RESETenvironment variable is set (see below)
Resetting Checkpoints
To ignore existing checkpoints and run your script from scratch, set the DATACHAIN_CHECKPOINTS_RESET environment variable:
Or set it inline:
This forces DataChain to recreate all datasets, regardless of existing checkpoints.
How Job Names Are Determined
DataChain uses different strategies for naming jobs depending on how the code is executed:
Script Execution (Checkpoints Enabled)
When running python my_script.py, DataChain uses the absolute path to the script as the job name:
This allows DataChain to link runs of the same script together as parent-child jobs, enabling checkpoint lookup.
Interactive or Module Execution (Checkpoints Disabled)
When running code interactively or as a module, DataChain uses a unique UUID as the job name:
This prevents unrelated executions from being linked together, but also means checkpoints cannot be used.
How Checkpoint Hashes Are Calculated
For each .save() operation, DataChain calculates a hash based on:
- The hash of the previous checkpoint in the current job (if any)
- The hash of the current DataChain operations chain
This creates a chain of hashes that uniquely identifies each stage of data processing. On subsequent runs, DataChain matches these hashes against the parent job's checkpoints and skips recreating datasets where the hashes match.
Hash Invalidation
Checkpoints are automatically invalidated when you modify the chain. Any change to the DataChain operations will result in a different hash, causing DataChain to skip the checkpoint and recompute the dataset.
Changes that invalidate checkpoints include:
- Modifying filter conditions:
.filter(dc.C("score") > 0.5)→.filter(dc.C("score") > 0.8) - Changing map/gen/agg functions: Any modification to UDF logic
- Altering function parameters: Changes to column names, output types, or other parameters
- Adding or removing operations: Inserting new
.filter(),.map(), or other steps - Reordering operations: Changing the sequence of transformations
Example
# First run - creates three checkpoints
dc.read_csv("data.csv").save("stage1") # Hash = H1
dc.read_dataset("stage1").filter(dc.C("x") > 5).save("stage2") # Hash = H2 = hash(H1 + pipeline_hash)
dc.read_dataset("stage2").select("name", "value").save("stage3") # Hash = H3 = hash(H2 + pipeline_hash)
Second run (no changes): - All three hashes match → all three datasets are reused → no computation
Second run (modified filter):
dc.read_csv("data.csv").save("stage1") # Hash = H1 matches ✓ → reused
dc.read_dataset("stage1").filter(dc.C("x") > 10).save("stage2") # Hash ≠ H2 ✗ → recomputed
dc.read_dataset("stage2").select("name", "value").save("stage3") # Hash ≠ H3 ✗ → recomputed
Because the filter changed, stage2 has a different hash and must be recomputed. Since stage3 depends on stage2, its hash also changes (because it includes H2 in the calculation), so it must be recomputed as well.
Key insight: Modifying any step in the chain invalidates that checkpoint and all subsequent checkpoints, because the hash chain is broken.
Dataset Persistence
Starting with the checkpoints feature, datasets created during script execution persist even if the script fails or is interrupted. This is essential for checkpoint functionality, as it allows subsequent runs to reuse successfully created datasets.
If you need to clean up datasets from failed runs, you can use:
import datachain as dc
# Remove a specific dataset
dc.delete_dataset("dataset_name")
# List all datasets to see what's available
for ds in dc.datasets():
print(ds.name)
UDF-Level Checkpoints
DataChain automatically creates checkpoints for UDFs (.map(), .gen(), .agg()), not just at .save() calls. For .map() and .gen(), DataChain saves processed rows continuously during UDF execution, not only when the UDF completes. If your script fails partway through a UDF, the next run will skip already-processed rows and continue where it left off - even if you've modified the UDF code to fix a bug.
Note: For .agg(), checkpoints are created when the aggregation completes successfully, but partial results are not tracked. If an aggregation fails partway through, it will restart from scratch on the next run.
How It Works
When executing .map() or .gen(), DataChain:
- Saves processed rows incrementally as the UDF processes your dataset
- Creates a checkpoint when the UDF completes successfully
- Allows you to fix bugs and continue - if the UDF fails, you can modify the code and re-run, skipping already-processed rows
- Invalidates the checkpoint if you change the UDF after successful completion - completed UDFs are recomputed from scratch if the code changes
For .agg(), checkpoints are only created upon successful completion, without incremental progress tracking.
Example: Fixing a Bug Mid-Execution
from datachain import File
def process_image(file: File) -> dict[str, int]:
# Bug: this will fail on some images
img = Image.open(file.get_local_path())
return {"width": img.size[0], "height": img.size[1]}
result = (
dc.read_dataset("images")
.map(process_image, output={"width": int, "height": int})
.save("image_dimensions")
)
First run: Script processes 50% of images successfully, then fails on a corrupted image.
After fixing the bug:
from datachain import File
def process_image(file: File) -> dict[str, int]:
# Fixed: handle corrupted images gracefully
try:
img = Image.open(file.get_local_path())
return {"width": img.size[0], "height": img.size[1]}
except Exception:
return {"width": 0, "height": 0}
Second run: DataChain automatically skips the 50% of images that were already processed successfully, and continues processing the remaining images using the fixed code. You don't lose any progress from the first run.
When UDF Checkpoints Are Invalidated
DataChain distinguishes between two types of UDF changes:
1. Code-Only Changes (Bug Fixes) - Continues from Partial Results
When you fix a bug in your UDF code without changing the output schema, DataChain allows you to continue from where the UDF failed. This is the key benefit of UDF-level checkpoints - you don't lose progress when fixing bugs.
Example: Bug fix without schema change
# First run - fails partway through
def process(num) -> int:
if num > 100:
raise Exception("Bug!") # Oops, a bug!
return num * 10
# Second run - continues from where it failed
def process(num) -> int:
return num * 10 # Bug fixed! ✓ Continues from partial results
In this case, DataChain will skip already-processed rows and continue processing the remaining rows with your fixed code.
2. Output Schema Changes - Forces Re-run from Scratch
When you change the output type or schema of your UDF, DataChain automatically detects this and reruns the entire UDF from scratch. This prevents schema mismatches that would cause errors or corrupt data.
Example: Schema change
# First run - fails partway through
def process(num) -> int:
if num > 100:
raise Exception("Bug!")
return num * 10
# Second run - output type changed
def process(num) -> str:
return f"value_{num * 10}" # Output type changed! ✗ Reruns from scratch
In this case, DataChain detects that the output changed from int to str and discards partial results to avoid schema incompatibility. All rows will be reprocessed with the new output schema.
Changes That Invalidate In-Progress UDF Checkpoints
Partial results are automatically discarded when you change:
- Output type or schema - Changes to the
outputparameter or return type annotations - Operations before the UDF - Any changes to the data processing chain before the UDF
Changes That Invalidate Completed UDF Checkpoints
Once a UDF completes successfully, its checkpoint is tied to the UDF function code. If you modify the function and re-run the script, DataChain will detect the change and recompute the entire UDF from scratch.
Changes that invalidate completed UDF checkpoints:
- Modifying the UDF function logic - Any code changes inside the function
- Changing function parameters or output types - Changes to input/output specifications
- Altering any operations before the UDF in the chain - Changes to upstream data processing
Key takeaway: For in-progress (partial) UDFs, you can fix bugs freely as long as the output schema stays the same. For completed UDFs, any code change triggers a full recomputation.
Forcing UDF to Start from Scratch
If you want to ignore any in-progress UDF work and recompute from the beginning, set the DATACHAIN_UDF_RESET environment variable:
This forces all UDFs to restart from scratch, discarding any checkpointed progress. This is useful when:
- You've changed the UDF logic and want to reprocess already-completed rows
- You suspect the checkpointed data is corrupted
- You want to ensure a clean computation for debugging
UDF Checkpoints vs Dataset Checkpoints
DataChain uses two levels of checkpoints:
- Dataset checkpoints (via
.save()) - Skip recreating entire datasets if the chain hasn't changed - UDF checkpoints (automatic) - Resume in-progress UDFs from where they left off
Both work together: if you have multiple .map() calls followed by a .save(), DataChain will resume from the last incomplete UDF. If all UDFs completed but the script failed before .save(), the next run will skip all UDFs and go straight to the save.
Limitations
When running locally:
- Script-based: Code must be run as a script (not interactively or as a module).
- Same script path: The script must be run from the same absolute path for parent job linking to work.
These limitations don't apply when running on Studio, where parent-child job linking is handled automatically by the platform.
Future Plans
Partial Result Tracking for Aggregations
Currently, .agg() creates checkpoints only upon successful completion, without tracking partial progress. Future versions will extend the same incremental progress tracking that .map() and .gen() have to aggregations, allowing them to resume from where they failed rather than restarting from scratch.