askill
workflow-orchestration-skill

workflow-orchestration-skillSafety 90Repository

Manages Apache Airflow DAGs for orchestrating OpenGov Harvester extraction workflows.

0 stars
1.2k downloads
Updated 1/26/2026

Package Files

Loading files...
SKILL.md

Workflow Orchestration Skill

Description

Manages Apache Airflow DAGs for orchestrating OpenGov Harvester extraction workflows.

Triggers

  • "start airflow"
  • "trigger dag"
  • "check task status"
  • "airflow orchestration"
  • "schedule extraction"

Airflow Architecture

Deployment

Method: Docker Compose Services: Webserver, Scheduler, Postgres, Redis Web UI: http://localhost:8080 Credentials: airflow / airflow (default)

DAG Structure

Core DAGs:

  1. opengov_master_pipeline: E2E orchestration (Inventory → Extraction → Report)
  2. opengov_inventory_queue_manager: Daily project list refresh
  3. opengov_queue_worker_pool: Parallel extraction with dynamic task mapping

Common Operations

Start/Stop Airflow

Start Services:

./scripts/airflow_start.sh

# Or manually
docker-compose -f airflow/docker-compose.yml up -d

Stop Services:

./scripts/airflow_stop.sh

# Or manually
docker-compose -f airflow/docker-compose.yml down

Check Status:

docker ps | grep airflow

# View logs
./scripts/airflow_logs.sh

DAG Management

List DAGs:

docker exec -it airflow-scheduler airflow dags list

Trigger DAG:

# Trigger with default config
docker exec -it airflow-scheduler airflow dags trigger opengov_master_pipeline

# Trigger with parameters
docker exec -it airflow-scheduler airflow dags trigger opengov_master_pipeline \
  --conf '{"backend": "supabase", "batch_size": 10}'

Pause/Unpause DAG:

docker exec -it airflow-scheduler airflow dags pause opengov_master_pipeline
docker exec -it airflow-scheduler airflow dags unpause opengov_master_pipeline

Task Management

List Tasks:

docker exec -it airflow-scheduler airflow tasks list opengov_master_pipeline

Test Task:

docker exec -it airflow-scheduler airflow tasks test \
  opengov_master_pipeline \
  extract_opportunities \
  2026-01-25

View Task Logs:

docker exec -it airflow-scheduler airflow tasks logs \
  opengov_master_pipeline \
  extract_opportunities \
  2026-01-25 \
  1

Monitoring

View DAG Runs:

docker exec -it airflow-scheduler airflow dags list-runs \
  -d opengov_master_pipeline \
  --state running

View Task Instances:

docker exec -it airflow-scheduler airflow tasks states-for-dag-run \
  opengov_master_pipeline \
  manual__2026-01-25T12:00:00+00:00

DAG Configuration

Master Pipeline DAG

# airflow/dags/opengov_master_pipeline.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'opengov-harvester',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'opengov_master_pipeline',
    default_args=default_args,
    description='Complete OpenGov extraction pipeline',
    schedule_interval='@daily',
    catchup=False,
    tags=['opengov', 'extraction', 'master'],
) as dag:

    # Task 1: Inventory collection
    collect_inventory = BashOperator(
        task_id='collect_inventory',
        bash_command='python /app/scripts/etl/fetch_project_data.py',
    )

    # Task 2: Sequential extraction
    extract_opportunities = BashOperator(
        task_id='extract_opportunities',
        bash_command='timeout 600 python /app/scripts/etl/step_3_sequential_extraction.py',
    )

    # Task 3: Generate report
    generate_report = BashOperator(
        task_id='generate_report',
        bash_command='python /app/scripts/generate_report.py',
    )

    # Define task dependencies
    collect_inventory >> extract_opportunities >> generate_report

Dynamic Task Mapping

# Dynamic fan-out for parallel processing
from airflow.decorators import task

@task
def get_project_ids():
    """Fetch list of project IDs to process"""
    import sqlite3
    conn = sqlite3.connect('/app/data/db/opengov_state.db')
    cursor = conn.execute("SELECT project_id FROM opengov_projects WHERE extracted = 0 LIMIT 10")
    return [row[0] for row in cursor.fetchall()]

@task
def process_project(project_id: str):
    """Process single project"""
    import subprocess
    subprocess.run([
        'python', '/app/scripts/etl/extract_single_project.py',
        '--project-id', project_id
    ])

with DAG(...) as dag:
    project_ids = get_project_ids()
    process_project.expand(project_id=project_ids)

Related Rules

  • airflow-dynamic-mapping.md: Dynamic task mapping patterns
  • airflow-orchestrate-only.md: Keep DAGs lightweight, delegate work
  • airflow-concurrency-control.md: Manage parallelism
  • airflow-error-handling.md: Partial failure recovery
  • airflow-idempotent-backfills.md: Safe re-runs

Configuration

Environment Variables

# In docker-compose.yml or .env
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True

DAG Parameters

{
  "backend": "supabase",
  "batch_size": 10,
  "max_workers": 5,
  "timeout_seconds": 600,
  "enable_anti_detection": true
}

Monitoring & Alerting

Web UI Monitoring

  1. Open http://localhost:8080
  2. Navigate to "DAGs" view
  3. Click on DAG name to see runs
  4. Click on run to see task details
  5. View logs for debugging

CLI Monitoring

# Watch DAG runs in real-time
watch -n 5 'docker exec -it airflow-scheduler airflow dags list-runs -d opengov_master_pipeline --state running'

# View failed tasks
docker exec -it airflow-scheduler airflow tasks failed-deps opengov_master_pipeline

Logging

Location: airflow/logs/ Format: <dag_id>/<task_id>/<execution_date>/<try_number>.log

# Tail logs
tail -f airflow/logs/opengov_master_pipeline/extract_opportunities/2026-01-25T12:00:00+00:00/1.log

Troubleshooting

DAG Not Appearing

Symptoms: DAG not visible in UI Solutions:

  1. Check DAG file syntax: python -m py_compile airflow/dags/my_dag.py
  2. Check scheduler logs: docker logs airflow-scheduler
  3. Verify DAG folder: docker exec airflow-scheduler ls /opt/airflow/dags
  4. Refresh DAGs in UI: Click refresh button

Task Stuck in Running

Symptoms: Task shows "running" but not progressing Solutions:

  1. Check task logs for errors
  2. Verify task process is running: docker exec airflow-scheduler ps aux
  3. Kill zombie tasks: docker exec airflow-scheduler airflow tasks clear <dag> <task>
  4. Restart scheduler: docker restart airflow-scheduler

Connection to Worker Lost

Symptoms: Task fails with "Worker lost" error Solutions:

  1. Check worker logs: docker logs airflow-worker
  2. Increase worker timeout in config
  3. Check resource limits: docker stats
  4. Restart worker: docker restart airflow-worker

Best Practices

DAG Design

  • ✅ Keep DAGs idempotent (safe to re-run)
  • ✅ Use task groups for logical grouping
  • ✅ Set appropriate retries and timeouts
  • ✅ Use XCom for small data passing only
  • ✅ Delegate heavy work to external scripts

Error Handling

from airflow.exceptions import AirflowSkipException

@task
def process_with_validation(data):
    if not validate(data):
        raise AirflowSkipException("Invalid data, skipping")
    return process(data)

Resource Management

# Limit concurrent tasks
default_args = {
    'max_active_runs': 1,
    'max_active_tasks_per_dag': 5,
}

# Set task-level concurrency
extract_task = BashOperator(
    task_id='extract',
    bash_command='...',
    pool='extraction_pool',  # Custom pool with limited slots
)

Advanced Features

Branching

from airflow.operators.python import BranchPythonOperator

def choose_backend(**context):
    backend = context['dag_run'].conf.get('backend', 'sqlite')
    return f'extract_to_{backend}'

branch = BranchPythonOperator(
    task_id='choose_backend',
    python_callable=choose_backend,
)

extract_sqlite = BashOperator(task_id='extract_to_sqlite', ...)
extract_supabase = BashOperator(task_id='extract_to_supabase', ...)

branch >> [extract_sqlite, extract_supabase]

Dynamic DAG Generation

# Generate multiple similar DAGs
for project_type in ['federal', 'state', 'local']:
    dag_id = f'opengov_extract_{project_type}'

    with DAG(dag_id, ...) as dag:
        extract = BashOperator(
            task_id='extract',
            bash_command=f'python extract.py --type {project_type}',
        )

    globals()[dag_id] = dag

Performance Optimization

Parallelism Settings

# In airflow.cfg or environment
AIRFLOW__CORE__PARALLELISM=32  # Global parallelism
AIRFLOW__CORE__DAG_CONCURRENCY=16  # Per-DAG concurrency
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=3  # Active DAG runs

Task Pools

# Create pool with 5 slots
docker exec airflow-scheduler airflow pools set extraction_pool 5 "Extraction task pool"

# Assign tasks to pool
extract_task = BashOperator(
    task_id='extract',
    bash_command='...',
    pool='extraction_pool',
)

Install

Download ZIP
Requires askill CLI v1.0+

AI Quality Score

96/100Analyzed 2/9/2026

An exceptionally detailed and actionable skill document for managing Airflow workflows. It provides comprehensive coverage from setup and basic operations to advanced DAG patterns and troubleshooting.

90
95
85
95
98

Metadata

Licenseunknown
Version-
Updated1/26/2026
PublisherDiatonic-AI

Tags

apici-cddatabasegithub-actionsobservabilitytesting