Workflow Orchestration Skill
Description
Manages Apache Airflow DAGs for orchestrating OpenGov Harvester extraction workflows.
Triggers
- "start airflow"
- "trigger dag"
- "check task status"
- "airflow orchestration"
- "schedule extraction"
Airflow Architecture
Deployment
Method: Docker Compose Services: Webserver, Scheduler, Postgres, Redis Web UI: http://localhost:8080 Credentials: airflow / airflow (default)
DAG Structure
Core DAGs:
opengov_master_pipeline: E2E orchestration (Inventory → Extraction → Report)opengov_inventory_queue_manager: Daily project list refreshopengov_queue_worker_pool: Parallel extraction with dynamic task mapping
Common Operations
Start/Stop Airflow
Start Services:
./scripts/airflow_start.sh
# Or manually
docker-compose -f airflow/docker-compose.yml up -d
Stop Services:
./scripts/airflow_stop.sh
# Or manually
docker-compose -f airflow/docker-compose.yml down
Check Status:
docker ps | grep airflow
# View logs
./scripts/airflow_logs.sh
DAG Management
List DAGs:
docker exec -it airflow-scheduler airflow dags list
Trigger DAG:
# Trigger with default config
docker exec -it airflow-scheduler airflow dags trigger opengov_master_pipeline
# Trigger with parameters
docker exec -it airflow-scheduler airflow dags trigger opengov_master_pipeline \
--conf '{"backend": "supabase", "batch_size": 10}'
Pause/Unpause DAG:
docker exec -it airflow-scheduler airflow dags pause opengov_master_pipeline
docker exec -it airflow-scheduler airflow dags unpause opengov_master_pipeline
Task Management
List Tasks:
docker exec -it airflow-scheduler airflow tasks list opengov_master_pipeline
Test Task:
docker exec -it airflow-scheduler airflow tasks test \
opengov_master_pipeline \
extract_opportunities \
2026-01-25
View Task Logs:
docker exec -it airflow-scheduler airflow tasks logs \
opengov_master_pipeline \
extract_opportunities \
2026-01-25 \
1
Monitoring
View DAG Runs:
docker exec -it airflow-scheduler airflow dags list-runs \
-d opengov_master_pipeline \
--state running
View Task Instances:
docker exec -it airflow-scheduler airflow tasks states-for-dag-run \
opengov_master_pipeline \
manual__2026-01-25T12:00:00+00:00
DAG Configuration
Master Pipeline DAG
# airflow/dags/opengov_master_pipeline.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'opengov-harvester',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'opengov_master_pipeline',
default_args=default_args,
description='Complete OpenGov extraction pipeline',
schedule_interval='@daily',
catchup=False,
tags=['opengov', 'extraction', 'master'],
) as dag:
# Task 1: Inventory collection
collect_inventory = BashOperator(
task_id='collect_inventory',
bash_command='python /app/scripts/etl/fetch_project_data.py',
)
# Task 2: Sequential extraction
extract_opportunities = BashOperator(
task_id='extract_opportunities',
bash_command='timeout 600 python /app/scripts/etl/step_3_sequential_extraction.py',
)
# Task 3: Generate report
generate_report = BashOperator(
task_id='generate_report',
bash_command='python /app/scripts/generate_report.py',
)
# Define task dependencies
collect_inventory >> extract_opportunities >> generate_report
Dynamic Task Mapping
# Dynamic fan-out for parallel processing
from airflow.decorators import task
@task
def get_project_ids():
"""Fetch list of project IDs to process"""
import sqlite3
conn = sqlite3.connect('/app/data/db/opengov_state.db')
cursor = conn.execute("SELECT project_id FROM opengov_projects WHERE extracted = 0 LIMIT 10")
return [row[0] for row in cursor.fetchall()]
@task
def process_project(project_id: str):
"""Process single project"""
import subprocess
subprocess.run([
'python', '/app/scripts/etl/extract_single_project.py',
'--project-id', project_id
])
with DAG(...) as dag:
project_ids = get_project_ids()
process_project.expand(project_id=project_ids)
Related Rules
airflow-dynamic-mapping.md: Dynamic task mapping patternsairflow-orchestrate-only.md: Keep DAGs lightweight, delegate workairflow-concurrency-control.md: Manage parallelismairflow-error-handling.md: Partial failure recoveryairflow-idempotent-backfills.md: Safe re-runs
Configuration
Environment Variables
# In docker-compose.yml or .env
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
DAG Parameters
{
"backend": "supabase",
"batch_size": 10,
"max_workers": 5,
"timeout_seconds": 600,
"enable_anti_detection": true
}
Monitoring & Alerting
Web UI Monitoring
- Open http://localhost:8080
- Navigate to "DAGs" view
- Click on DAG name to see runs
- Click on run to see task details
- View logs for debugging
CLI Monitoring
# Watch DAG runs in real-time
watch -n 5 'docker exec -it airflow-scheduler airflow dags list-runs -d opengov_master_pipeline --state running'
# View failed tasks
docker exec -it airflow-scheduler airflow tasks failed-deps opengov_master_pipeline
Logging
Location: airflow/logs/
Format: <dag_id>/<task_id>/<execution_date>/<try_number>.log
# Tail logs
tail -f airflow/logs/opengov_master_pipeline/extract_opportunities/2026-01-25T12:00:00+00:00/1.log
Troubleshooting
DAG Not Appearing
Symptoms: DAG not visible in UI Solutions:
- Check DAG file syntax:
python -m py_compile airflow/dags/my_dag.py - Check scheduler logs:
docker logs airflow-scheduler - Verify DAG folder:
docker exec airflow-scheduler ls /opt/airflow/dags - Refresh DAGs in UI: Click refresh button
Task Stuck in Running
Symptoms: Task shows "running" but not progressing Solutions:
- Check task logs for errors
- Verify task process is running:
docker exec airflow-scheduler ps aux - Kill zombie tasks:
docker exec airflow-scheduler airflow tasks clear <dag> <task> - Restart scheduler:
docker restart airflow-scheduler
Connection to Worker Lost
Symptoms: Task fails with "Worker lost" error Solutions:
- Check worker logs:
docker logs airflow-worker - Increase worker timeout in config
- Check resource limits:
docker stats - Restart worker:
docker restart airflow-worker
Best Practices
DAG Design
- ✅ Keep DAGs idempotent (safe to re-run)
- ✅ Use task groups for logical grouping
- ✅ Set appropriate retries and timeouts
- ✅ Use XCom for small data passing only
- ✅ Delegate heavy work to external scripts
Error Handling
from airflow.exceptions import AirflowSkipException
@task
def process_with_validation(data):
if not validate(data):
raise AirflowSkipException("Invalid data, skipping")
return process(data)
Resource Management
# Limit concurrent tasks
default_args = {
'max_active_runs': 1,
'max_active_tasks_per_dag': 5,
}
# Set task-level concurrency
extract_task = BashOperator(
task_id='extract',
bash_command='...',
pool='extraction_pool', # Custom pool with limited slots
)
Advanced Features
Branching
from airflow.operators.python import BranchPythonOperator
def choose_backend(**context):
backend = context['dag_run'].conf.get('backend', 'sqlite')
return f'extract_to_{backend}'
branch = BranchPythonOperator(
task_id='choose_backend',
python_callable=choose_backend,
)
extract_sqlite = BashOperator(task_id='extract_to_sqlite', ...)
extract_supabase = BashOperator(task_id='extract_to_supabase', ...)
branch >> [extract_sqlite, extract_supabase]
Dynamic DAG Generation
# Generate multiple similar DAGs
for project_type in ['federal', 'state', 'local']:
dag_id = f'opengov_extract_{project_type}'
with DAG(dag_id, ...) as dag:
extract = BashOperator(
task_id='extract',
bash_command=f'python extract.py --type {project_type}',
)
globals()[dag_id] = dag
Performance Optimization
Parallelism Settings
# In airflow.cfg or environment
AIRFLOW__CORE__PARALLELISM=32 # Global parallelism
AIRFLOW__CORE__DAG_CONCURRENCY=16 # Per-DAG concurrency
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=3 # Active DAG runs
Task Pools
# Create pool with 5 slots
docker exec airflow-scheduler airflow pools set extraction_pool 5 "Extraction task pool"
# Assign tasks to pool
extract_task = BashOperator(
task_id='extract',
bash_command='...',
pool='extraction_pool',
)
