This interactive guide shows common patterns for using dags. Run each cell to see how the library works step by step.
Let’s start by importing the libraries:
import numpy as np
import dagsPattern 1: Building Computational Pipelines¶
The core use case for dags is combining multiple interdependent functions into a single callable. Let’s build a simple data processing pipeline step by step.
First, we define three functions that depend on each other:
def cleaned_data(raw_data):
"""Filter out negative values."""
return [x for x in raw_data if x > 0]def statistics(cleaned_data):
"""Compute summary stats from cleaned data."""
return {
"mean": sum(cleaned_data) / len(cleaned_data),
"count": len(cleaned_data),
}def report(statistics, cleaned_data):
"""Generate a text report."""
return f"Processed {statistics['count']} items, mean: {statistics['mean']}"Notice how the parameter names create a dependency graph:
cleaned_datadepends onraw_data(external input)statisticsdepends oncleaned_datareportdepends on bothstatisticsandcleaned_data
Now let’s combine them into a pipeline:
functions = {
"cleaned_data": cleaned_data,
"statistics": statistics,
"report": report,
}
pipeline = dags.concatenate_functions(
functions=functions,
targets=["report"],
return_type="dict",
)Let’s run it with some test data:
result = pipeline(raw_data=[1, -2, 3, 4, -5, 6])
resultTry changing the input data to see different results:
pipeline(raw_data=[10, 20, -30, 40, 50])Example: Economic Model with Utility Maximization¶
Now let’s build something more realistic—a consumer choice model. We’ll define the building blocks first:
def utility(consumption, risk_aversion):
"""CRRA utility function."""
if risk_aversion == 1:
return np.log(consumption)
return (consumption ** (1 - risk_aversion)) / (1 - risk_aversion)def budget_constraint(income, price):
"""Maximum affordable consumption."""
return income / pricedef feasible(consumption, budget_constraint):
"""Check if consumption is affordable."""
return consumption <= budget_constraintdef optimal_utility(budget_constraint, risk_aversion):
"""Find maximum utility over a grid of consumption values."""
consumption_grid = np.linspace(0.1, budget_constraint, 100)
if risk_aversion == 1:
utilities = np.log(consumption_grid)
else:
utilities = (consumption_grid ** (1 - risk_aversion)) / (1 - risk_aversion)
return float(np.max(utilities))Let’s collect all functions:
functions = {
"utility": utility,
"budget_constraint": budget_constraint,
"feasible": feasible,
"optimal_utility": optimal_utility,
}Now the power of dags becomes clear: we can create different combined functions from the same building blocks depending on what we need.
Use case 1: Solve for optimal utility given income and prices:
solve_model = dags.concatenate_functions(
functions=functions,
targets=["optimal_utility"],
return_type="dict",
)
solve_model(income=1000, price=10, risk_aversion=2)Use case 2: Evaluate whether a specific consumption choice is feasible and what utility it gives:
evaluate_choice = dags.concatenate_functions(
functions=functions,
targets=["utility", "feasible"],
return_type="dict",
)
evaluate_choice(income=1000, price=10, consumption=50, risk_aversion=2)What if consumption is too high?
evaluate_choice(income=1000, price=10, consumption=150, risk_aversion=2)Use case 3: Just compute the budget constraint (dags only runs what’s needed):
get_budget = dags.concatenate_functions(
functions=functions,
targets=["budget_constraint"],
return_type="dict",
)
get_budget(income=1000, price=10)This pattern is powerful when:
You have a complex model with many interrelated components
Different use cases require computing different subsets of outputs
You want to avoid code duplication by reusing the same function definitions
Pattern 2: Aggregating Multiple Functions¶
When you need to combine multiple functions into a single result (like checking if ALL constraints are satisfied), use an aggregator.
Let’s define three constraints for a consumption choice:
def positive_consumption(consumption):
"""Consumption must be positive."""
return consumption > 0def within_budget(consumption, budget_constraint):
"""Consumption must not exceed budget."""
return consumption <= budget_constraintdef minimum_savings(consumption, income):
"""Must save at least 10% of income."""
return consumption <= 0.9 * incomeNow combine them with all as the aggregator:
all_feasible = dags.concatenate_functions(
functions={
"positive_consumption": positive_consumption,
"within_budget": within_budget,
"minimum_savings": minimum_savings,
},
targets=["positive_consumption", "within_budget", "minimum_savings"],
aggregator=all,
)Test with a valid choice (consumption=80 with income=100 and budget=100):
all_feasible(consumption=80, budget_constraint=100, income=100)Now test with consumption=95 (violates the 10% savings rule):
all_feasible(consumption=95, budget_constraint=100, income=100)You can check individual constraints to see which one failed:
print(f"positive: {positive_consumption(95)}")
print(f"within_budget: {within_budget(95, 100)}")
print(f"minimum_savings: {minimum_savings(95, 100)} <- this fails (95 > 90)")Pattern 3: Generating Functions for Multiple Scenarios¶
In economic modeling, you often need similar functions for different time periods, regions, or agent types. You can generate them programmatically.
Here’s a factory function that creates tax calculators:
def create_income_tax(rate, threshold):
"""Create a tax function with given rate and threshold."""
def income_tax(gross_income):
taxable = max(0, gross_income - threshold)
return taxable * rate
return income_taxDefine tax rules that changed over time:
tax_rules = {
2020: {"rate": 0.25, "threshold": 10000},
2021: {"rate": 0.27, "threshold": 12000},
2022: {"rate": 0.30, "threshold": 12000},
}
tax_rulesGenerate year-specific tax functions using rename_arguments to give each function its own input:
functions = {}
for year, params in tax_rules.items():
tax_func = create_income_tax(params["rate"], params["threshold"])
functions[f"tax_{year}"] = dags.rename_arguments(
tax_func, mapper={"gross_income": f"income_{year}"}
)
list(functions.keys())Add a function that sums up all years:
def total_tax_burden(tax_2020, tax_2021, tax_2022):
"""Sum of taxes across all years."""
return tax_2020 + tax_2021 + tax_2022
functions["total_tax_burden"] = total_tax_burdenCombine and compute:
combined = dags.concatenate_functions(
functions=functions,
targets=["total_tax_burden"],
return_type="dict",
)
combined(income_2020=50000, income_2021=55000, income_2022=60000)Let’s verify this manually:
tax_2020 = (50000 - 10000) * 0.25
tax_2021 = (55000 - 12000) * 0.27
tax_2022 = (60000 - 12000) * 0.30
print(f"2020: {tax_2020}, 2021: {tax_2021}, 2022: {tax_2022}")
print(f"Total: {tax_2020 + tax_2021 + tax_2022}")Pattern 4: Selective Computation¶
When your function graph contains expensive computations, create different combined functions that compute only what’s needed. dags automatically prunes the graph.
Here’s a simulation example:
def simulated_data(parameters, n_simulations):
"""Monte Carlo simulation (the expensive part)."""
rng = np.random.default_rng(42)
return rng.normal(
loc=parameters["mean"], scale=parameters["std"], size=n_simulations
)def summary_statistics(simulated_data):
"""Compute mean, std from simulations."""
return {
"mean": float(np.mean(simulated_data)),
"std": float(np.std(simulated_data)),
}def full_distribution(simulated_data):
"""Compute empirical distribution (percentiles)."""
return {
"p10": float(np.percentile(simulated_data, 10)),
"p50": float(np.percentile(simulated_data, 50)),
"p90": float(np.percentile(simulated_data, 90)),
}def quick_check(parameters):
"""Fast sanity check (doesn't need simulation)."""
return all(v > 0 for v in parameters.values())functions = {
"simulated_data": simulated_data,
"summary_statistics": summary_statistics,
"full_distribution": full_distribution,
"quick_check": quick_check,
}Create three different combined functions for different purposes:
Validator - only runs quick_check, skips simulation entirely:
validator = dags.concatenate_functions(
functions=functions,
targets=["quick_check"],
return_type="dict",
)
validator(parameters={"mean": 10, "std": 2})Summarizer - runs simulation + summary stats:
summarizer = dags.concatenate_functions(
functions=functions,
targets=["summary_statistics"],
return_type="dict",
)
summarizer(parameters={"mean": 10, "std": 2}, n_simulations=1000)Full analysis - runs everything:
full_analysis = dags.concatenate_functions(
functions=functions,
targets=["summary_statistics", "full_distribution"],
return_type="dict",
)
full_analysis(parameters={"mean": 10, "std": 2}, n_simulations=1000)Pattern 5: Dependency Analysis¶
Use get_ancestors to analyze which inputs affect specific outputs. This is useful for understanding model structure.
Let’s build a small income model:
def wage(education, experience):
return 20000 + 5000 * education + 1000 * experience
def capital_income(wealth, interest_rate):
return wealth * interest_rate
def total_income(wage, capital_income):
return wage + capital_income
def consumption(total_income, savings_rate):
return total_income * (1 - savings_rate)
functions = {
"wage": wage,
"capital_income": capital_income,
"total_income": total_income,
"consumption": consumption,
}What affects consumption? (includes both functions and their inputs)
ancestors = dags.get_ancestors(
functions=functions,
targets=["consumption"],
include_targets=True,
)
ancestorsWhat are the external inputs (parameters the user must provide)?
all_args = set()
for func in functions.values():
all_args.update(dags.get_free_arguments(func))
external_inputs = all_args - set(functions.keys())
external_inputsPattern 6: Working with Nested Structures¶
Use dags.tree for hierarchical function organization. This is useful when you have functions grouped by category.
Here’s a tax-transfer system organized hierarchically:
import dags.tree as dtfunctions = {
"income": {
"wage": lambda hours, hourly_wage: hours * hourly_wage,
"capital": lambda wealth, interest_rate: wealth * interest_rate,
},
"taxes": {
"income_tax": lambda income__wage, income__capital: (
0.3 * (income__wage + income__capital)
),
},
"transfers": {
"basic_income": lambda: 500,
},
"net_income": lambda income__wage,
income__capital,
taxes__income_tax,
transfers__basic_income: (
income__wage + income__capital - taxes__income_tax + transfers__basic_income
),
}Flatten the nested structure to qualified names (using __ as separator):
flat_functions = dt.flatten_to_qnames(functions)
list(flat_functions.keys())Now combine and run:
combined = dags.concatenate_functions(
functions=flat_functions,
targets=["net_income"],
return_type="dict",
)
combined(hours=40, hourly_wage=25, wealth=10000, interest_rate=0.05)Let’s verify:
Wage: 40 × 25 = 1000
Capital: 10000 × 0.05 = 500
Tax: 0.3 × (1000 + 500) = 450
Net: 1000 + 500 - 450 + 500 = 1550 ✓
wage = 40 * 25
capital = 10000 * 0.05
tax = 0.3 * (wage + capital)
net = wage + capital - tax + 500
print(f"Wage: {wage}, Capital: {capital}, Tax: {tax}, Net: {net}")See the Tree documentation for more details.
Pattern 7: Signature Inspection and Modification¶
Sometimes you need to inspect or modify function signatures. Here are the tools available:
def model(alpha, beta, gamma):
return alpha + beta * gammaInspect a function’s arguments:
dags.get_free_arguments(model)Rename arguments to match your naming convention:
renamed = dags.rename_arguments(
model,
mapper={
"alpha": "intercept",
"beta": "slope",
"gamma": "x",
},
)
dags.get_free_arguments(renamed)Test the renamed function:
renamed(intercept=1, slope=2, x=3)Get type annotations from a function:
def typed_func(x: float, y: int) -> float:
return x + y
dags.get_annotations(typed_func)Best Practices¶
Use descriptive function names: Since dags uses names for dependency resolution, clear names make the DAG easier to understand and debug.
Keep functions focused: Each function should do one thing well, making the DAG modular and testable.
Document dependencies: Even though dags infers dependencies from parameter names, documenting expected inputs in docstrings helps maintainability.
Use
enforce_signature=Falsefor dynamic cases:
combined = dags.concatenate_functions(
functions={
"report": report,
"statistics": statistics,
"cleaned_data": cleaned_data,
},
targets=["report"],
enforce_signature=False,
)Set annotations for type checking:
combined = dags.concatenate_functions(
functions={
"report": report,
"statistics": statistics,
"cleaned_data": cleaned_data,
},
targets=["report"],
set_annotations=True,
)