askill
airflow-dag-analyzer

airflow-dag-analyzerSafety 98Repository

Analyzes, validates, and optimizes Apache Airflow DAGs for reliability, performance, and best practices adherence.

29 stars
1.2k downloads
Updated 2/5/2026

Package Files

Loading files...
SKILL.md

Airflow DAG Analyzer

Analyzes, validates, and optimizes Apache Airflow DAGs for reliability and performance.

Overview

This skill examines Apache Airflow DAG definitions to identify performance bottlenecks, reliability issues, and best practice violations. It provides recommendations for task dependency optimization, parallelism configuration, error handling, and resource management.

Capabilities

  • DAG structure analysis and validation - Parse and validate DAG structure
  • Task dependency optimization - Identify bottlenecks and suggest parallel execution
  • Parallelism and concurrency recommendations - Optimize pool and slot allocation
  • SLA and timeout configuration - Recommend appropriate timeouts and SLAs
  • Retry and failure handling patterns - Validate retry logic and alerting
  • Sensor optimization - Smart sensors, deferrable operators, reschedule mode
  • Resource pool allocation - Optimize pool usage and worker distribution
  • DAG scheduling optimization - Catchup, backfill, and schedule interval tuning
  • Cross-DAG dependency detection - Identify external dependencies and triggers

Input Schema

{
  "dagCode": {
    "type": "string",
    "description": "The Python DAG definition code",
    "required": true
  },
  "dagId": {
    "type": "string",
    "description": "The DAG identifier"
  },
  "executionHistory": {
    "type": "object",
    "description": "Historical execution metrics",
    "properties": {
      "runs": {
        "type": "array",
        "items": {
          "dagRunId": "string",
          "executionDate": "string",
          "duration": "number",
          "state": "string",
          "taskDurations": "object"
        }
      }
    }
  },
  "clusterConfig": {
    "type": "object",
    "properties": {
      "workerCount": "number",
      "executorType": "string",
      "poolConfigs": "object",
      "airflowVersion": "string"
    }
  },
  "analysisScope": {
    "type": "array",
    "items": {
      "type": "string",
      "enum": ["structure", "performance", "reliability", "resources", "security"]
    },
    "default": ["structure", "performance", "reliability"]
  }
}

Output Schema

{
  "validationResults": {
    "errors": {
      "type": "array",
      "items": {
        "code": "string",
        "message": "string",
        "line": "number",
        "severity": "error"
      }
    },
    "warnings": {
      "type": "array",
      "items": {
        "code": "string",
        "message": "string",
        "line": "number",
        "severity": "warning"
      }
    }
  },
  "optimizations": {
    "type": "array",
    "items": {
      "category": "string",
      "current": "string",
      "recommended": "string",
      "impact": "high|medium|low",
      "effort": "string",
      "codeChange": "string"
    }
  },
  "recommendedConfig": {
    "type": "object",
    "properties": {
      "poolSize": "number",
      "maxActiveRuns": "number",
      "concurrency": "number",
      "defaultRetries": "number",
      "executionTimeout": "string"
    }
  },
  "dependencyGraph": {
    "type": "object",
    "properties": {
      "nodes": "array",
      "edges": "array",
      "criticalPath": "array",
      "parallelGroups": "array"
    }
  },
  "metrics": {
    "taskCount": "number",
    "maxDepth": "number",
    "parallelizationRatio": "number",
    "estimatedDuration": "string"
  },
  "securityFindings": {
    "type": "array",
    "items": {
      "severity": "high|medium|low",
      "finding": "string",
      "recommendation": "string"
    }
  }
}

Usage Examples

Basic DAG Analysis

{
  "dagCode": "from airflow import DAG\nfrom airflow.operators.python import PythonOperator\n...",
  "dagId": "daily_etl_pipeline"
}

With Execution History

{
  "dagCode": "...",
  "dagId": "daily_etl_pipeline",
  "executionHistory": {
    "runs": [
      {
        "dagRunId": "manual__2024-01-15",
        "duration": 3600,
        "state": "success",
        "taskDurations": {
          "extract": 600,
          "transform": 1800,
          "load": 1200
        }
      }
    ]
  }
}

Full Analysis with Cluster Config

{
  "dagCode": "...",
  "dagId": "complex_ml_pipeline",
  "clusterConfig": {
    "workerCount": 8,
    "executorType": "KubernetesExecutor",
    "poolConfigs": {
      "default_pool": {"slots": 128},
      "ml_pool": {"slots": 32}
    },
    "airflowVersion": "2.8.0"
  },
  "analysisScope": ["structure", "performance", "reliability", "resources", "security"]
}

Validation Rules

DAG Definition Rules

RuleSeverityDescription
DAG-001ErrorMissing DAG default_args
DAG-002ErrorInvalid schedule_interval
DAG-003WarningCatchup enabled for long-running DAG
DAG-004WarningNo email on failure configured
DAG-005InfoConsider using @dag decorator

Task Definition Rules

RuleSeverityDescription
TSK-001ErrorTask has no upstream or downstream
TSK-002WarningTask missing retries configuration
TSK-003WarningExecution timeout not set
TSK-004WarningPythonOperator with no pool
TSK-005InfoConsider TaskGroup for related tasks

Sensor Rules

RuleSeverityDescription
SEN-001WarningSensor in poke mode (use reschedule)
SEN-002WarningSensor missing timeout
SEN-003InfoConsider deferrable operator
SEN-004WarningExternal sensor without soft_fail

Security Rules

RuleSeverityDescription
SEC-001ErrorHardcoded credentials
SEC-002WarningUsing Variable.get without default
SEC-003WarningConnection ID not parameterized
SEC-004InfoConsider Secrets Backend

Optimization Patterns

Parallelization

# Before: Sequential execution
task1 >> task2 >> task3 >> task4

# After: Parallel execution where possible
task1 >> [task2, task3] >> task4

Sensor Optimization

# Before: Poke mode (blocks worker)
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    mode='poke'  # Bad
)

# After: Reschedule mode (releases worker)
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    mode='reschedule',  # Good
    poke_interval=300
)

# Best: Deferrable (Airflow 2.2+)
from airflow.sensors.filesystem import FileSensor
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    deferrable=True
)

TaskGroups

# Before: Flat task structure
extract_orders >> transform_orders >> load_orders
extract_products >> transform_products >> load_products

# After: TaskGroups for organization
with TaskGroup('orders') as orders_group:
    extract >> transform >> load

with TaskGroup('products') as products_group:
    extract >> transform >> load

Dynamic Task Mapping (Airflow 2.3+)

# Before: Static task generation
for i in range(10):
    PythonOperator(task_id=f'process_{i}', ...)

# After: Dynamic task mapping
@task
def process_item(item):
    return item * 2

process_item.expand(item=[1, 2, 3, 4, 5])

Configuration Recommendations

Default Args Template

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
    'execution_timeout': timedelta(hours=2),
    'sla': timedelta(hours=1),
}

Pool Configuration

Workload TypeRecommended Pool Size
Heavy compute2-4 per worker
I/O bound8-16 per worker
API callsRate limit based
SensorsSeparate pool, high slots

Integration Points

MCP Server Integration

  • yangkyeongmo/mcp-server-apache-airflow - Airflow REST API integration
  • Dagster MCP - Alternative orchestration patterns
  • Prefect MCP - Modern orchestration comparison

Related Skills

  • dbt Project Analyzer (SK-DEA-003) - dbt operator optimization
  • Data Lineage Mapper (SK-DEA-010) - Task lineage extraction

Applicable Processes

  • ETL/ELT Pipeline (etl-elt-pipeline.js)
  • A/B Testing Pipeline (ab-testing-pipeline.js)
  • Pipeline Migration (pipeline-migration.js)
  • Data Quality Framework (data-quality-framework.js)

References

Version History

  • 1.0.0 - Initial release with Airflow 2.x support

Install

Download ZIP
Requires askill CLI v1.0+

AI Quality Score

98/100Analyzed 2 hours ago

Metadata

Licenseunknown
Version1.0.0
Updated2/5/2026
Publishera5c-ai

Tags

apici-cdgithubobservabilitysecurity