Python Data Engineering
Comprehensive patterns for building production-grade data pipelines, dimensional models, and API integrations using SQLAlchemy 2.0+ and modern data warehousing practices.
When to Use This Skill
Apply this skill when building custom Python data pipelines with SQLAlchemy and PostgreSQL.
Use this skill for:
- Building ETL/ELT pipelines with Python
- Transforming API responses to database models
- Designing dimensional warehouses (fact/dimension tables)
- Implementing schema evolution strategies
- Creating reusable data transformation layers
- Integrating external systems (SaaS APIs, ERPs) with PostgreSQL
Consider frameworks FIRST for common SaaS integrations:
- Airbyte (600+ connectors, open-source) - NetSuite, Salesforce, Stripe, etc.
- Meltano/Singer (300+ taps, code-centric) - Reproducible, Git-based pipelines
- Fivetran (500+ connectors, managed) - Fast time-to-market, higher cost
- dbt (SQL transformations only) - Post-load transformations, not extraction
Build custom Python when:
- No existing connector for your source system
- Complex business logic required during transformation
- Unique incremental sync requirements
- Strategic differentiator justifying maintenance cost
- Existing connectors don't support your customizations
Build vs Buy Decision Tree
Need to integrate external data source?
│
├─ Check existing connectors (Airbyte, Fivetran, Meltano)
│ │
│ ├─ Connector exists + meets requirements
│ │ └─ [USE FRAMEWORK] - Faster, maintained, documented
│ │
│ └─ No connector OR custom logic needed
│ │
│ ├─ Simple transformation (SQL only)
│ │ └─ [USE dbt] - Post-load SQL transformations
│ │
│ └─ Complex transformation OR custom logic
│ └─ [BUILD CUSTOM PYTHON] - Full control, use patterns in this skill
│
└─ Consider:
• Maintenance cost (custom code = ongoing maintenance)
• Time to market (framework = faster initial setup)
• Customization needs (custom code = unlimited flexibility)
• Team expertise (SQL vs Python)
Decision guide: See when-to-build-vs-buy.md for detailed analysis
Quick Reference by Topic
SQLAlchemy Patterns
TypeDecorators (custom column types): See sqlalchemy/type-decorators.md Factory methods (from_dict, from_api): See sqlalchemy/factory-patterns.md Query patterns: See sqlalchemy/query-patterns.md Repository pattern: See sqlalchemy/repository-pattern.md Migrations with Alembic: See sqlalchemy/migrations.md
Pydantic Integration
API validation: See pydantic/api-validation.md Settings management: See pydantic/settings-management.md
Data Warehouse Patterns
Naming conventions (dim_, fact_): See warehouse/naming-conventions.md Slowly Changing Dimensions: See warehouse/scd-patterns.md
Integration Patterns
Field mapping strategies: See integration/field-mapping.md Incremental sync (high-water mark): See integration/incremental-sync.md Schema resilience (JSONB): See integration/schema-resilience.md UV package management (local deps): See integration/uv-package-management.md
Core Architecture Pattern
This skill teaches the three-layer separation for data pipelines:
Source System Layer (libs.external_api)
└─ HTTP client, authentication, queries
└─ Source-specific quirks and normalization
Database Schema Layer (libs.database)
└─ Models (DimCustomer, FactOrders, etc.)
└─ Mappers (API → Model transformations)
└─ Custom types (TypeDecorators)
Application Layer (your applications)
└─ Orchestration (when, what, how to sync)
└─ Business logic (analysis, reporting)
└─ CLI/API interfaces
Why this separation:
- Source layer changes when external API evolves
- Schema layer changes when business needs evolve
- Application layer changes when workflows evolve
- Clear boundaries prevent coupling
Common Use Cases
Use Case 1: API Response → Database Model
Problem: Transform external API response to SQLAlchemy model with type conversions
Solution: Factory classmethod + TypeDecorators
class Customer(Base):
email: Mapped[str] = mapped_column(NormalizedEmail)
is_active: Mapped[bool] = mapped_column(BooleanStringType)
@classmethod
def from_api_response(cls, data: dict, sync_time: datetime) -> "Customer":
# Transformation logic here
See sqlalchemy/factory-patterns.md for complete pattern.
Use Case 2: Custom Field Schema Evolution
Problem: SaaS system adds/removes custom fields, breaking rigid schema
Solution: JSONB column with lifecycle metadata
class FlexibleRecordMixin:
custom_fields: Mapped[dict] = mapped_column(JSONB)
# Structure: {"field": {"value": ..., "first_seen": ..., "deprecated": bool}}
See integration/schema-resilience.md for complete pattern.
Use Case 3: Incremental Sync from External System
Problem: Re-fetching all data is slow; need to sync only changed records
Solution: High-water mark pattern with sync metadata
max_date = session.query(func.max(Customer.last_modified)).scalar()
api_response = client.get(f"/customers?modified_since={max_date}")
See integration/incremental-sync.md for complete pattern.
Use Case 4: Multi-Source Data Integration
Problem: Combine data from multiple external systems into unified schema
Solution: Conformed dimensions + source-specific mappers
class Customer(Base):
source_system: Mapped[str] # "api_a", "api_b", "internal"
source_id: Mapped[str] # Original system's ID
class APIACustomerMapper:
@classmethod
def to_customer(cls, data: dict) -> Customer:
# API A schema → unified Customer schema
class APIBCustomerMapper:
@classmethod
def to_customer(cls, data: dict) -> Customer:
# API B schema → unified Customer schema
Each mapper handles its source's quirks independently.
Testing Your Data Pipeline
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from datetime import datetime, UTC
@pytest.fixture
def db_session():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
with Session(engine) as session:
yield session
def test_api_to_model_transformation(db_session):
api_response = {"customer_id": "C001", "name": "Test Corp", "email": "test@example.com"}
customer = Customer.from_api_response(api_response, datetime.now(UTC))
db_session.add(customer)
db_session.commit()
assert customer.customer_id == "C001"
assert customer.name == "Test Corp"
Best Practices Summary
SQLAlchemy
- Use TypeDecorators for custom types (set
cache_ok=True) - Factory classmethods for API transformations
- Hybrid properties for computed columns
- Event listeners for validation/audit (not business logic)
- Repository pattern for data access abstraction
Data Warehousing
- Star schema for analytics (fact tables + dimensions)
- Naming conventions:
dim_,fact_,stg_,bridge_ - SCD Type 2 for historical tracking
- Medallion architecture: bronze (raw) → silver (clean) → gold (business)
- Audit columns: created_at, updated_at, synced_at, schema_version
Integration
- Pydantic for API boundary validation
- Source-specific mappers for multi-system integration
- JSONB for flexible/evolving schemas
- Incremental sync with high-water marks
- Complete raw_data preservation for reprocessing
- Idempotent operations (safe to re-run)
This skill provides practical patterns for Python data engineering with SQLAlchemy, Pydantic, and PostgreSQL.
