Skip to content

CarbonAware Airflow API

CarbonAwareOperator

Bases: BaseOperator

Defers execution of downstream tasks to an optimal time based on carbon intensity.

This operator uses the CarbonAware Scheduler API to find the optimal time to run tasks within a specified time window, based on carbon intensity in the specified cloud region(s).

When added to a DAG, this operator will defer execution of all downstream tasks until the optimal time for carbon intensity. This allows for carbon-aware scheduling of your workflows without modifying the tasks themselves.

Parameters:

Name Type Description Default
execution_window_minutes int

The time window (in minutes) during which the tasks can be executed. Defaults to 60.

60
task_duration_minutes int

The expected duration of the downstream tasks in minutes. Defaults to 30.

30
zone Optional[Dict[str, str]]

Cloud provider zone to find carbon intensity for. Format: {"provider": "aws", "region": "us-east-1"}. If None, will attempt to introspect from instance metadata. Defaults to None.

None
**kwargs

Additional arguments passed to the BaseOperator constructor.

{}
Example
with DAG(...) as dag:
    # This operator will determine the optimal time to run
    carbon_aware = CarbonAwareOperator(
        task_id="wait_for_optimal_carbon",
        execution_window_minutes=120,  # Look for optimal time in the next 2 hours
        task_duration_minutes=30,      # Expected duration of downstream tasks
        zone={"provider": "aws", "region": "us-east-1"},
    )

    # These tasks will run at the optimal time
    task1 = PythonOperator(...)
    task2 = BashOperator(...)

    # Define dependencies
    carbon_aware >> [task1, task2]
Source code in .venv/lib/python3.13/site-packages/airflow_provider_carbonaware/operators/carbonaware.py
def __init__(
    self,
    *,
    execution_window_minutes: int = 60,
    task_duration_minutes: int = 30,
    zone: Optional[Dict[str, str]] = None,
    **kwargs,
) -> None:
    super().__init__(**kwargs)
    self.execution_window_minutes = execution_window_minutes
    self.task_duration_minutes = task_duration_minutes
    self.zone = zone

execute(context)

Determines the optimal time to execute based on carbon intensity and defers execution to that time.

Parameters:

Name Type Description Default
context Context

The execution context.

required

Returns:

Name Type Description
Any Any

None if executing immediately, otherwise defers execution.

Source code in .venv/lib/python3.13/site-packages/airflow_provider_carbonaware/operators/carbonaware.py
def execute(self, context: Context) -> Any:
    """
    Determines the optimal time to execute based on carbon intensity and defers execution to that time.

    Args:
        context (Context): The execution context.

    Returns:
        Any: None if executing immediately, otherwise defers execution.
    """
    # Find the optimal time and defer
    self.log.info("Finding optimal execution time based on carbon intensity")
    optimal_time = self._find_optimal_time()

    # If optimal time is now or in the past, execute immediately
    now = datetime.now(timezone.utc)
    if optimal_time <= now:
        self.log.info(
            "Optimal time is now or in the past, proceeding with execution"
        )
        return None

    # Otherwise, defer to the optimal time
    self.log.info(f"Deferring execution to optimal time: {optimal_time}")
    self.defer(
        trigger=DateTimeTrigger(moment=optimal_time, end_from_trigger=True),
        method_name="execute_complete",
    )

execute_complete(context, event=None)

Callback for deferred execution. This is called when the trigger fires.

Parameters:

Name Type Description Default
context Context

The execution context.

required
event Optional[Dict[str, Any]]

The event data from the trigger. Defaults to None.

None

Returns:

Name Type Description
Any Any

None, allowing downstream tasks to proceed.

Source code in .venv/lib/python3.13/site-packages/airflow_provider_carbonaware/operators/carbonaware.py
def execute_complete(
    self, context: Context, event: Optional[Dict[str, Any]] = None
) -> Any:
    """
    Callback for deferred execution. This is called when the trigger fires.

    Args:
        context (Context): The execution context.
        event (Optional[Dict[str, Any]], optional): The event data from the trigger.
            Defaults to None.

    Returns:
        Any: None, allowing downstream tasks to proceed.
    """
    moment = event.get("moment") if event else None
    self.log.info(f"Reached optimal carbon intensity time: {moment}")
    return None