askill
building-with-sqlmodel-async

building-with-sqlmodel-asyncSafety 90Repository

Use when building async database layers with SQLModel and PostgreSQL. Triggers include async session management, create_async_engine, SQLModel relationships, CRUD operations with async/await, N+1 prevention with selectinload, JSONB columns, self-referential models, or Alembic async migrations. NOT when using sync SQLAlchemy (use sync patterns) or raw SQL (use SQLModel ORM).

143 stars
2.9k downloads
Updated 2/22/2026

Package Files

Loading files...
SKILL.md

SQLModel Async Database Development Guide

Build production async database layers for FastAPI agent backends using SQLModel + SQLAlchemy 2.0 async patterns with PostgreSQL.

Overview

SQLModel combines Pydantic and SQLAlchemy, providing type-safe ORM with async support. For agent backends, async database operations are essential for non-blocking I/O during agent tool calls, API requests, and concurrent operations.

Quick Reference

Installation

# Production stack
pip install sqlmodel sqlalchemy[asyncio] asyncpg alembic

# Development
pip install aiosqlite  # For SQLite async testing

Core Imports

from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel import SQLModel, Field, Relationship, select
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.orm import selectinload
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import JSONB

Engine Setup

Production PostgreSQL

from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession
from collections.abc import AsyncGenerator

# Convert sync URL to async format
def get_async_database_url(url: str) -> str:
    """Convert postgresql:// to postgresql+asyncpg://"""
    if url.startswith("postgresql://"):
        url = url.replace("postgresql://", "postgresql+asyncpg://", 1)
    elif url.startswith("postgres://"):
        url = url.replace("postgres://", "postgresql+asyncpg://", 1)
    return url

DATABASE_URL = get_async_database_url(settings.database_url)

engine = create_async_engine(
    DATABASE_URL,
    echo=settings.debug,
    pool_size=5,
    max_overflow=10,
    pool_pre_ping=True,    # Essential for managed DBs (Neon, Supabase)
    pool_recycle=300,      # Recycle connections every 5 minutes
)

SQLite for Testing

if DATABASE_URL.startswith("sqlite"):
    engine = create_async_engine(
        DATABASE_URL,
        echo=settings.debug,
        connect_args={"check_same_thread": False},
    )

Table Creation

async def create_db_and_tables() -> None:
    """Create all database tables."""
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

Session Management

FastAPI Dependency

async def get_session() -> AsyncGenerator[AsyncSession]:
    """Dependency that yields async database sessions."""
    async with AsyncSession(engine) as session:
        yield session

Using in Endpoints

from fastapi import Depends

@router.get("/api/tasks/{task_id}")
async def get_task(
    task_id: int,
    session: AsyncSession = Depends(get_session),
):
    task = await session.get(Task, task_id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    return task

Model Design

Basic Model with JSONB

from sqlmodel import SQLModel, Field
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import JSONB
from datetime import datetime

class Task(SQLModel, table=True):
    """A unit of work with metadata."""

    __tablename__ = "task"

    id: int | None = Field(default=None, primary_key=True)
    title: str = Field(max_length=500)
    description: str | None = Field(default=None)
    status: str = Field(default="pending")
    priority: str = Field(default="medium")

    # JSONB for list/dict fields (PostgreSQL)
    tags: list[str] = Field(
        default_factory=list,
        sa_column=Column(JSONB, nullable=False, server_default="[]"),
    )

    # Timestamps
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)

Foreign Keys and Relationships

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from .project import Project
    from .worker import Worker

class Task(SQLModel, table=True):
    # Foreign keys
    project_id: int = Field(foreign_key="project.id", index=True)
    assignee_id: int | None = Field(
        default=None,
        foreign_key="worker.id",
    )

    # Relationships
    project: "Project" = Relationship(back_populates="tasks")
    assignee: "Worker" = Relationship(
        back_populates="assigned_tasks",
        sa_relationship_kwargs={"foreign_keys": "[Task.assignee_id]"},
    )

Self-Referential Relationships (Parent-Child)

class Task(SQLModel, table=True):
    parent_task_id: int | None = Field(
        default=None,
        foreign_key="task.id",
    )

    # Self-referential: parent
    parent: "Task" = Relationship(
        back_populates="subtasks",
        sa_relationship_kwargs={
            "remote_side": "Task.id",
            "foreign_keys": "[Task.parent_task_id]",
        },
    )

    # Self-referential: children
    subtasks: list["Task"] = Relationship(
        back_populates="parent",
        sa_relationship_kwargs={"foreign_keys": "[Task.parent_task_id]"},
    )

CRUD Operations

Create

async def create_task(
    session: AsyncSession,
    data: TaskCreate,
    creator_id: int,
) -> Task:
    task = Task(
        title=data.title,
        description=data.description,
        project_id=data.project_id,
        created_by_id=creator_id,
    )
    session.add(task)
    await session.flush()    # Get task.id without committing
    await session.commit()
    await session.refresh(task)
    return task

Read Single

async def get_task(session: AsyncSession, task_id: int) -> Task | None:
    return await session.get(Task, task_id)

Read with Query

async def list_tasks_by_project(
    session: AsyncSession,
    project_id: int,
    status: str | None = None,
) -> list[Task]:
    stmt = select(Task).where(Task.project_id == project_id)

    if status:
        stmt = stmt.where(Task.status == status)

    stmt = stmt.order_by(Task.created_at.desc())

    result = await session.exec(stmt)
    return list(result.all())

Update

async def update_task(
    session: AsyncSession,
    task: Task,
    data: TaskUpdate,
) -> Task:
    if data.title is not None:
        task.title = data.title
    if data.status is not None:
        task.status = data.status

    task.updated_at = datetime.utcnow()
    session.add(task)
    await session.commit()
    await session.refresh(task)
    return task

Delete

async def delete_task(session: AsyncSession, task: Task) -> None:
    await session.delete(task)
    await session.commit()

N+1 Prevention with Eager Loading

The Problem

# BAD: N+1 queries - each task.assignee triggers a query
tasks = (await session.exec(select(Task))).all()
for task in tasks:
    print(task.assignee.name)  # N additional queries!

The Solution: selectinload

from sqlalchemy.orm import selectinload

# GOOD: Eager load relationships in single query
stmt = (
    select(Task)
    .options(
        selectinload(Task.assignee),
        selectinload(Task.subtasks),
    )
    .where(Task.project_id == project_id)
)

result = await session.exec(stmt)
tasks = result.unique().all()  # unique() required with selectinload

for task in tasks:
    print(task.assignee.name)  # No additional queries!

When to Use Each Strategy

Relationship TypeStrategyWhy
Many-to-one (task → assignee)selectinload or joinedloadBoth efficient
One-to-many (project → tasks)selectinloadAvoids row explosion
Many-to-manyselectinloadSingle efficient query
Self-referentialselectinloadHandles recursion

Transaction Patterns

flush() vs commit()

async def create_with_audit(session: AsyncSession, data: dict):
    # Create main record
    task = Task(**data)
    session.add(task)
    await session.flush()  # Get task.id, keep transaction open

    # Create audit record using task.id
    audit = AuditLog(entity_id=task.id, action="created")
    session.add(audit)

    # Single commit for both
    await session.commit()

Rollback on Error

async def transactional_operation(session: AsyncSession):
    try:
        task = Task(title="New task")
        session.add(task)
        await session.flush()

        # Might fail
        await some_risky_operation(task.id)

        await session.commit()
    except Exception:
        await session.rollback()
        raise

Context Manager Pattern

async with AsyncSession(engine) as session:
    async with session.begin():
        # All operations in single transaction
        session.add(task1)
        session.add(task2)
    # Auto-commit on exit, rollback on exception

Alembic Async Migrations

Initialize

alembic init -t async alembic

Configure alembic.ini

sqlalchemy.url = postgresql+asyncpg://user:pass@localhost/dbname

Configure env.py

from sqlmodel import SQLModel
from your_app.models import Task, Project  # Import all models

target_metadata = SQLModel.metadata

def run_migrations_offline():
    context.configure(
        url=settings.database_url,
        target_metadata=target_metadata,
        literal_binds=True,
    )
    with context.begin_transaction():
        context.run_migrations()

async def run_async_migrations():
    connectable = create_async_engine(
        get_async_database_url(settings.database_url)
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()

def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    asyncio.run(run_async_migrations())

Generate and Run

alembic revision --autogenerate -m "Add tasks table"
alembic upgrade head

Common Patterns

Pagination

async def list_paginated(
    session: AsyncSession,
    limit: int = 50,
    offset: int = 0,
) -> list[Task]:
    stmt = (
        select(Task)
        .order_by(Task.created_at.desc())
        .offset(offset)
        .limit(limit)
    )
    result = await session.exec(stmt)
    return list(result.all())

Soft Delete

class Task(SQLModel, table=True):
    deleted_at: datetime | None = Field(default=None)

async def soft_delete(session: AsyncSession, task: Task):
    task.deleted_at = datetime.utcnow()
    session.add(task)
    await session.commit()

Bulk Insert

async def bulk_create_tasks(
    session: AsyncSession,
    tasks_data: list[dict],
) -> list[Task]:
    tasks = [Task(**data) for data in tasks_data]
    session.add_all(tasks)
    await session.commit()
    return tasks

Safety & Guardrails

NEVER

  • Use sync SQLAlchemy in async code (blocks event loop)
  • Share AsyncSession across concurrent tasks (not thread-safe)
  • Access lazy-loaded relationships without eager loading in async
  • Forget result.unique().all() with selectinload

ALWAYS

  • Use pool_pre_ping=True for managed databases
  • Import models before create_db_and_tables()
  • Use TYPE_CHECKING for relationship type hints
  • Handle MissingGreenlet errors (indicates lazy load in async)

Error Handling

from sqlalchemy.exc import IntegrityError

try:
    await session.commit()
except IntegrityError as e:
    await session.rollback()
    if "unique constraint" in str(e):
        raise HTTPException(400, "Duplicate entry")
    raise

TaskManager Database Example

Complete database layer for Task API:

# database.py
from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/taskdb",
    pool_pre_ping=True,
    pool_size=5,
)

async def get_session():
    async with AsyncSession(engine) as session:
        yield session

# models/task.py
class Task(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    title: str
    status: str = Field(default="pending")
    project_id: int = Field(foreign_key="project.id")
    assignee_id: int | None = Field(foreign_key="worker.id")

    project: "Project" = Relationship(back_populates="tasks")
    assignee: "Worker" = Relationship(back_populates="tasks")

# routers/tasks.py
@router.get("/tasks")
async def list_tasks(
    session: AsyncSession = Depends(get_session),
    project_id: int = Query(...),
):
    stmt = (
        select(Task)
        .options(selectinload(Task.assignee))
        .where(Task.project_id == project_id)
    )
    result = await session.exec(stmt)
    return result.unique().all()

References

Load these for detailed patterns:

Install

Download ZIP
Requires askill CLI v1.0+

AI Quality Score

92/100Analyzed 2/23/2026

Comprehensive technical guide for async SQLModel + PostgreSQL development. Excellent coverage of engine setup, session management, model relationships, CRUD operations, N+1 prevention, transactions, and Alembic migrations. Well-structured with clear code examples and a dedicated safety section. Slightly penalized for being in a deep archive path, but content is highly reusable and actionable for any FastAPI project.

90
90
85
92
95

Metadata

Licenseunknown
Version-
Updated2/22/2026
Publisherpanaversity

Tags

apici-cddatabase