askill
python-data-pipelines

python-data-pipelinesSafety --Repository

Data pipeline orchestration with Prefect and Airflow. Use this skill when building ETL/ELT pipelines, need to choose orchestration tools, or designing data workflows. Covers task dependencies, error handling, retries, scheduling, and best practices for data pipeline architecture.

1 stars
1.2k downloads
Updated 2/15/2026

Package Files

Loading files...
SKILL.md

Python Data Pipelines

Modern data pipeline orchestration patterns with Prefect and Airflow.

Decision Matrix: Prefect vs Airflow

FactorPrefectAirflowWinner
Learning curveGentler (Pythonic)Steeper (DAG syntax)Prefect
Dynamic workflowsNativeRequires workaroundsPrefect
Local developmentExcellentHarderPrefect
Ecosystem maturityNewer (2018)Mature (2014)Airflow

General guidance:

  • Use Prefect when: New projects, want Pythonic API, dynamic workflows
  • Use Airflow when: Existing Airflow org, need battle-tested tool

Prefect Patterns

Basic Task and Flow

from prefect import task, flow

@task
def extract_data(source: str) -> list:
    return fetch_from_api(source)

@task
def transform_data(data: list) -> list:
    return [process_record(r) for r in data]

@flow(name="ETL Pipeline")
def etl_pipeline(source: str, destination: str):
    raw = extract_data(source)
    transformed = transform_data(raw)
    load_data(transformed, destination)

Retries and Caching

from datetime import timedelta
from prefect.tasks import task_input_hash

@task(
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
def unreliable_api_call(endpoint: str):
    response = requests.get(endpoint)
    response.raise_for_status()
    return response.json()

See prefect-patterns.md for:

  • Subflows
  • Task results and artifacts
  • Scheduling and deployment

Airflow Patterns

Basic DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

with DAG(
    'etl_pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    
    extract >> transform >> load

See airflow-patterns.md for:

  • TaskFlow API (modern Airflow)
  • Sensors for waiting
  • Branch operators
  • Dynamic task generation

Pipeline Design Best Practices

Idempotency

# GOOD - Upsert based on key
def load_data(data):
    for record in data:
        db.upsert(record, key='id')

Incremental Processing

@task
def extract_incremental(last_run: datetime):
    return fetch_data_since(last_run)

Data Quality Checks

@task
def validate_data(data: list) -> list:
    for record in data:
        assert 'id' in record, "Missing ID"
        assert record['amount'] >= 0, "Negative amount"
    return data

See pipeline-design-patterns.md for:

  • Partitioning strategies
  • Backfill patterns
  • Monitoring and alerting

Testing Pipelines

See testing-pipelines.md for:

  • Mocking data sources
  • Integration test patterns
  • Local development setups

Anti-Patterns to Avoid

AvoidUse Instead
Non-idempotent operationsUpserts, delete-and-insert
Tightly coupled tasksClear interfaces
No error handlingRetries, alerts, checkpoints

source: Prefect docs, Airflow docs

Install

Download ZIP
Requires askill CLI v1.0+

AI Quality Score

AI review pending.

Metadata

Licenseunknown
Version-
Updated2/15/2026
Publisherjustanesta

Tags

apici-cdobservabilitytesting