A

airflow-dag-patterns

Verified

by wshobson · Updated Apr 16, 2026

opus
18in9.4Kout
$0.682m 55s

Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.

airflowdata-pipelinesdag-authoringworkflow-orchestrationpython

See It In Action

1 turns·claude
The project codebase is in your current working directory. Explore the files to understand the code before making changes. need the daily product-usage airflow dag today for tomorrow's exec metrics review — ingest app events, validate partitions, do incremental transforms, load warehouse tables

Let me explore the codebase first, then build the DAG.

Now I have the full picture. Let me invoke the Airflow skill for best practices, then build the DAG and tests.

I have everything I need. Let me build the DAG and tests.

Now let me add tests for the new DAG.

DAG and tests are ready. Here's what was created:

dags/daily_product_usage.py — full pipeline with 9 tasks:

start → wait_for_partition → validate_partition → transform_clean_events
  ├── load_clean_events ──────────────────┐
  └── transform_daily_aggregates          │
       └── load_daily_aggregates ─────────┤
                                    notify_success → end
StageTaskDetails
Ingestwait_for_partitionS3KeySensor in reschedule mode, polls every 5min, 2hr timeout
Validatevalidate_partitionChecks min 100k rows, dup rate < 0.1%, null rate < 2% on key columns
Transformtransform_clean_eventsDedupes on event_id, drops null key-column rows, adds ingested_at, writes staging parquet
Transformtransform_daily_aggregatesComputes unique_users, total_events, avg/p95_session_duration, new/returning_users per feature x platform
Loadload_clean_eventsDELETE+COPY into analytics.product_usage_events_clean (idempotent)
Loadload_daily_aggregatesDELETE+COPY into analytics.product_usage_daily (idempotent)
Notifynotify_successSlack summary to #data-pipeline-runs; failure callback posts to #data-alerts

Key conventions matched from the existing health_check_dag.py:

  • TaskFlow API, catchup=False, max_active_runs=1
  • Exponential backoff retries (3 retries, 5min base, 60min cap)
  • data-team owner, proper email list
  • Date-partitioned with {{ ds }} templating
  • XCom passes S3 path references, not data

tests/test_dag_integrity.py — 8 new test cases covering DAG existence, schedule, tags, task count, task names, sensor mode, and dependency chain.

Claude Sonnet 4.6

External Tools

ToolType
python3binary

Permissions

ScopeDescription
filesystem:read
filesystem:write
process:spawn

SKILL.md

Apache Airflow DAG Patterns

Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies.

When to Use This Skill

  • Creating data pipeline orchestration with Airflow
  • Designing DAG structures and dependencies
  • Implementing custom operators and sensors
  • Testing Airflow DAGs locally
  • Setting up Airflow in production
  • Debugging failed DAG runs

Core Concepts

1. DAG Design Principles

PrincipleDescription
IdempotentRunning twice produces same result
AtomicTasks succeed or fail completely
IncrementalProcess only new/changed data
ObservableLogs, metrics, alerts at every step

2. Task Dependencies

# Linear
task1 >> task2 >> task3

# Fan-out
task1 >> [task2, task3, task4]

# Fan-in
[task1, task2, task3] >> task4

# Complex
task1 >> task2 >> task4
task1 >> task3 >> task4

Quick Start

# dags/example_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(hours=1),
}

with DAG(
    dag_id='example_etl',
    default_args=default_args,
    description='Example ETL pipeline',
    schedule='0 6 * * *',  # Daily at 6 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'example'],
    max_active_runs=1,
) as dag:

    start = EmptyOperator(task_id='start')

    def extract_data(**context):
        execution_date = context['ds']
        # Extract logic here
        return {'records': 1000}

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    end = EmptyOperator(task_id='end')

    start >> extract >> end

Patterns

Pattern 1: TaskFlow API (Airflow 2.0+)

# dags/taskflow_example.py
from datetime import datetime
from airflow.decorators import dag, task
from airflow.models import Variable

@dag(
    dag_id='taskflow_etl',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'taskflow'],
)
def taskflow_etl():
    """ETL pipeline using TaskFlow API"""

    @task()
    def extract(source: str) -> dict:
        """Extract data from source"""
        import pandas as pd

        df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')
        return {'data': df.to_dict(), 'rows': len(df)}

    @task()
    def transform(extracted: dict) -> dict:
        """Transform extracted data"""
        import pandas as pd

        df = pd.DataFrame(extracted['data'])
        df['processed_at'] = datetime.now()
        df = df.dropna()
        return {'data': df.to_dict(), 'rows': len(df)}

    @task()
    def load(transformed: dict, target: str):
        """Load data to target"""
        import pandas as pd

        df = pd.DataFrame(transformed['data'])
        df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')
        return transformed['rows']

    @task()
    def notify(rows_loaded: int):
        """Send notification"""
        print(f'Loaded {rows_loaded} rows')

    # Define dependencies with XCom passing
    extracted = extract(source='raw_data')
    transformed = transform(extracted)
    loaded = load(transformed, target='processed_data')
    notify(loaded)

# Instantiate the DAG
taskflow_etl()

Pattern 2: Dynamic DAG Generation

# dags/dynamic_dag_factory.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
import json

# Configuration for multiple similar pipelines
PIPELINE_CONFIGS = [
    {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'},
    {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'},
    {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'},
]

def create_dag(config: dict) -> DAG:
    """Factory function to create DAGs from config"""

    dag_id = f"etl_{config['name']}"

    default_args = {
        'owner': 'data-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
    }

    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        schedule=config['schedule'],
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['etl', 'dynamic', config['name']],
    )

    with dag:
        def extract_fn(source, **context):
            print(f"Extracting from {source} for {context['ds']}")

        def transform_fn(**context):
            print(f"Transforming data for {context['ds']}")

        def load_fn(table_name, **context):
            print(f"Loading to {table_name} for {context['ds']}")

        extract = PythonOperator(
            task_id='extract',
            python_callable=extract_fn,
            op_kwargs={'source': config['source']},
        )

        transform = PythonOperator(
            task_id='transform',
            python_callable=transform_fn,
        )

        load = PythonOperator(
            task_id='load',
            python_callable=load_fn,
            op_kwargs={'table_name': config['name']},
        )

        extract >> transform >> load

    return dag

# Generate DAGs
for config in PIPELINE_CONFIGS:
    globals()[f"dag_{config['name']}"] = create_dag(config)

Pattern 3: Branching and Conditional Logic

# dags/branching_example.py
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule

@dag(
    dag_id='branching_pipeline',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def branching_pipeline():

    @task()
    def check_data_quality() -> dict:
        """Check data quality and return metrics"""
        quality_score = 0.95  # Simulated
        return {'score': quality_score, 'rows': 10000}

    def choose_branch(**context) -> str:
        """Determine which branch to execute"""
        ti = context['ti']
        metrics = ti.xcom_pull(task_ids='check_data_quality')

        if metrics['score'] >= 0.9:
            return 'high_quality_path'
        elif metrics['score'] >= 0.7:
            return 'medium_quality_path'
        else:
            return 'low_quality_path'

    quality_check = check_data_quality()

    branch = BranchPythonOperator(
        task_id='branch',
        python_callable=choose_branch,
    )

    high_quality = EmptyOperator(task_id='high_quality_path')
    medium_quality = EmptyOperator(task_id='medium_quality_path')
    low_quality = EmptyOperator(task_id='low_quality_path')

    # Join point - runs after any branch completes
    join = EmptyOperator(
        task_id='join',
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
    )

    quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join

branching_pipeline()

Pattern 4: Sensors and External Dependencies

# dags/sensor_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='sensor_example',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    # Wait for file on S3
    wait_for_file = S3KeySensor(
        task_id='wait_for_s3_file',
        bucket_name='data-lake',
        bucket_key='raw/{{ ds }}/data.parquet',
        aws_conn_id='aws_default',
        timeout=60 * 60 * 2,  # 2 hours
        poke_interval=60 * 5,  # Check every 5 minutes
        mode='reschedule',  # Free up worker slot while waiting
    )

    # Wait for another DAG to complete
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_upstream_dag',
        external_dag_id='upstream_etl',
        external_task_id='final_task',
        execution_date_fn=lambda dt: dt,  # Same execution date
        timeout=60 * 60 * 3,
        mode='reschedule',
    )

    # Custom sensor using @task.sensor decorator
    @task.sensor(poke_interval=60, timeout=3600, mode='reschedule')
    def wait_for_api() -> PokeReturnValue:
        """Custom sensor for API availability"""
        import requests

        response = requests.get('https://api.example.com/health')
        is_done = response.status_code == 200

        return PokeReturnValue(is_done=is_done, xcom_value=response.json())

    api_ready = wait_for_api()

    def process_data(**context):
        api_result = context['ti'].xcom_pull(task_ids='wait_for_api')
        print(f"API returned: {api_result}")

    process = PythonOperator(
        task_id='process',
        python_callable=process_data,
    )

    [wait_for_file, wait_for_upstream, api_ready] >> process

Pattern 5: Error Handling and Alerts

# dags/error_handling.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable

def task_failure_callback(context):
    """Callback on task failure"""
    task_instance = context['task_instance']
    exception = context.get('exception')

    # Send to Slack/PagerDuty/etc
    message = f"""
    Task Failed!
    DAG: {task_instance.dag_id}
    Task: {task_instance.task_id}
    Execution Date: {context['ds']}
    Error: {exception}
    Log URL: {task_instance.log_url}
    """
    # send_slack_alert(message)
    print(message)

def dag_failure_callback(context):
    """Callback on DAG failure"""
    # Aggregate failures, send summary
    pass

with DAG(
    dag_id='error_handling_example',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    on_failure_callback=dag_failure_callback,
    default_args={
        'on_failure_callback': task_failure_callback,
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
    },
) as dag:

    def might_fail(**context):
        import random
        if random.random() < 0.3:
            raise ValueError("Random failure!")
        return "Success"

    risky_task = PythonOperator(
        task_id='risky_task',
        python_callable=might_fail,
    )

    def cleanup(**context):
        """Cleanup runs regardless of upstream failures"""
        print("Cleaning up...")

    cleanup_task = PythonOperator(
        task_id='cleanup',
        python_callable=cleanup,
        trigger_rule=TriggerRule.ALL_DONE,  # Run even if upstream fails
    )

    def notify_success(**context):
        """Only runs if all upstream succeeded"""
        print("All tasks succeeded!")

    success_notification = PythonOperator(
        task_id='notify_success',
        python_callable=notify_success,
        trigger_rule=TriggerRule.ALL_SUCCESS,
    )

    risky_task >> [cleanup_task, success_notification]

Pattern 6: Testing DAGs

# tests/test_dags.py
import pytest
from datetime import datetime
from airflow.models import DagBag

@pytest.fixture
def dagbag():
    return DagBag(dag_folder='dags/', include_examples=False)

def test_dag_loaded(dagbag):
    """Test that all DAGs load without errors"""
    assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"

def test_dag_structure(dagbag):
    """Test specific DAG structure"""
    dag = dagbag.get_dag('example_etl')

    assert dag is not None
    assert len(dag.tasks) == 3
    assert dag.schedule_interval == '0 6 * * *'

def test_task_dependencies(dagbag):
    """Test task dependencies are correct"""
    dag = dagbag.get_dag('example_etl')

    extract_task = dag.get_task('extract')
    assert 'start' in [t.task_id for t in extract_task.upstream_list]
    assert 'end' in [t.task_id for t in extract_task.downstream_list]

def test_dag_integrity(dagbag):
    """Test DAG has no cycles and is valid"""
    for dag_id, dag in dagbag.dags.items():
        assert dag.test_cycle() is None, f"Cycle detected in {dag_id}"

# Test individual task logic
def test_extract_function():
    """Unit test for extract function"""
    from dags.example_dag import extract_data

    result = extract_data(ds='2024-01-01')
    assert 'records' in result
    assert isinstance(result['records'], int)

Project Structure

airflow/
├── dags/
│   ├── __init__.py
│   ├── common/
│   │   ├── __init__.py
│   │   ├── operators.py    # Custom operators
│   │   ├── sensors.py      # Custom sensors
│   │   └── callbacks.py    # Alert callbacks
│   ├── etl/
│   │   ├── customers.py
│   │   └── orders.py
│   └── ml/
│       └── training.py
├── plugins/
│   └── custom_plugin.py
├── tests/
│   ├── __init__.py
│   ├── test_dags.py
│   └── test_operators.py
├── docker-compose.yml
└── requirements.txt

Best Practices

Do's

  • Use TaskFlow API - Cleaner code, automatic XCom
  • Set timeouts - Prevent zombie tasks
  • Use mode='reschedule' - For sensors, free up workers
  • Test DAGs - Unit tests and integration tests
  • Idempotent tasks - Safe to retry

Don'ts

  • Don't use depends_on_past=True - Creates bottlenecks
  • Don't hardcode dates - Use {{ ds }} macros
  • Don't use global state - Tasks should be stateless
  • Don't skip catchup blindly - Understand implications
  • Don't put heavy logic in DAG file - Import from modules

FAQ

What does airflow-dag-patterns do?

Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.

When should I use airflow-dag-patterns?

Use it when you need a repeatable workflow that produces source code, code diff.

What does airflow-dag-patterns output?

In the evaluated run it produced source code, code diff.

How do I install or invoke airflow-dag-patterns?

Ask the agent to use this skill when the task matches its documented workflow.

Which agents does airflow-dag-patterns support?

Agent support is inferred from the source, but not explicitly declared.

What tools, channels, or permissions does airflow-dag-patterns need?

It uses python3; channels commonly include code, diff; permissions include filesystem:read, filesystem:write, process:spawn.

Is airflow-dag-patterns safe to install?

Static analysis marked this skill as medium risk; review side effects and permissions before enabling it.

How is airflow-dag-patterns different from an MCP or plugin?

A skill packages instructions and workflow conventions; tools, MCP servers, and plugins are dependencies the skill may call during execution.

Does airflow-dag-patterns outperform not using a skill?

About airflow-dag-patterns

When to use airflow-dag-patterns

When creating new Airflow DAGs for ETL or batch processing workflows. When refactoring existing DAGs to use better dependency patterns, sensors, branching, or TaskFlow APIs. When testing or debugging Airflow pipeline code locally before deployment.

When airflow-dag-patterns is not the right choice

When you need the agent to operate a live Airflow environment directly through an external service integration. When the task is general data processing code that does not involve Airflow DAG design or orchestration.

What it produces

Produces source code and code diff.