CarbonAware Airflow API
CarbonAwareOperator
Bases: BaseOperator
Defers execution of downstream tasks to an optimal time based on carbon intensity.
This operator uses the CarbonAware Scheduler API to find the optimal time to run tasks within a specified time window, based on carbon intensity in the specified cloud region(s).
When added to a DAG, this operator will defer execution of all downstream tasks until the optimal time for carbon intensity. This allows for carbon-aware scheduling of your workflows without modifying the tasks themselves.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
execution_window_minutes
|
int
|
The time window (in minutes) during which the tasks can be executed. Defaults to 60. |
60
|
task_duration_minutes
|
int
|
The expected duration of the downstream tasks in minutes. Defaults to 30. |
30
|
zone
|
Optional[Dict[str, str]]
|
Cloud provider zone to find carbon intensity for. Format: {"provider": "aws", "region": "us-east-1"}. If None, will attempt to introspect from instance metadata. Defaults to None. |
None
|
**kwargs
|
Additional arguments passed to the BaseOperator constructor. |
{}
|
Example
with DAG(...) as dag:
# This operator will determine the optimal time to run
carbon_aware = CarbonAwareOperator(
task_id="wait_for_optimal_carbon",
execution_window_minutes=120, # Look for optimal time in the next 2 hours
task_duration_minutes=30, # Expected duration of downstream tasks
zone={"provider": "aws", "region": "us-east-1"},
)
# These tasks will run at the optimal time
task1 = PythonOperator(...)
task2 = BashOperator(...)
# Define dependencies
carbon_aware >> [task1, task2]
Source code in .venv/lib/python3.13/site-packages/airflow_provider_carbonaware/operators/carbonaware.py
execute(context)
Determines the optimal time to execute based on carbon intensity and defers execution to that time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context
|
Context
|
The execution context. |
required |
Returns:
Name | Type | Description |
---|---|---|
Any |
Any
|
None if executing immediately, otherwise defers execution. |
Source code in .venv/lib/python3.13/site-packages/airflow_provider_carbonaware/operators/carbonaware.py
execute_complete(context, event=None)
Callback for deferred execution. This is called when the trigger fires.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context
|
Context
|
The execution context. |
required |
event
|
Optional[Dict[str, Any]]
|
The event data from the trigger. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Any |
Any
|
None, allowing downstream tasks to proceed. |