askill
tasks-module

tasks-moduleSafety 90Repository

myfy TasksModule for background job processing with SQL-based queue. Use when working with TasksModule, @task decorator, background jobs, task workers, TaskContext, task retries, or async task dispatch.

85 stars
1.7k downloads
Updated 1/12/2026

Package Files

Loading files...
SKILL.md

TasksModule - Background Jobs

TasksModule provides SQL-based async task processing with DI injection and automatic retries.

Quick Start

from myfy.core import Application
from myfy.data import DataModule
from myfy.tasks import TasksModule, task

app = Application()
app.add_module(DataModule())
app.add_module(TasksModule(auto_create_tables=True))

# Define a task
@task
async def send_email(to: str, subject: str, body: str) -> None:
    await email_service.send(to, subject, body)

# Dispatch from a route
@route.post("/notifications")
async def notify_user(body: NotifyRequest) -> dict:
    task_id = await send_email.send(
        to=body.email,
        subject="Welcome!",
        body="Thanks for signing up.",
    )
    return {"task_id": task_id}

Configuration

Environment variables use the MYFY_TASKS_ prefix:

VariableDefaultDescription
MYFY_TASKS_DEFAULT_MAX_RETRIES3Default retry attempts
MYFY_TASKS_RETRY_DELAY_SECONDS60.0Seconds between retries
MYFY_TASKS_WORKER_CONCURRENCY4Concurrent tasks per worker
MYFY_TASKS_POLL_INTERVAL1.0Seconds between queue polls
MYFY_TASKS_TASK_TIMEOUT300.0Max seconds per task

Defining Tasks

Basic Task

from myfy.tasks import task

@task
async def process_order(order_id: int) -> str:
    # Process the order
    return f"Processed order {order_id}"

Task with DI Injection

Services are automatically injected at runtime:

from myfy.tasks import task
from myfy.data import AsyncSession

@task
async def sync_user_data(user_id: int, session: AsyncSession) -> None:
    # session is TASK-scoped (injected per task execution)
    user = await session.get(User, user_id)
    await sync_to_external_service(user)

Task with Custom Options

@task(max_retries=5, retry_on=[ConnectionError, TimeoutError])
async def upload_file(file_path: str) -> str:
    # Retries up to 5 times on connection/timeout errors
    return await s3.upload(file_path)

Dispatching Tasks

Basic Dispatch

# Returns immediately with task_id
task_id = await send_email.send(to="user@example.com", subject="Hi")

Dispatch Options

task_id = await send_email.send(
    to="user@example.com",
    subject="Hi",
    _priority=10,      # Higher priority = executes first
    _delay=60,         # Wait 60 seconds before executing
    _max_retries=5,    # Override default retries
)

Getting Results

result = await send_email.get_result(task_id, timeout=60)

if result.is_completed:
    print(f"Success: {result.value}")
elif result.is_failed:
    print(f"Error: {result.error}")
elif result.is_pending:
    print("Still processing...")

TaskContext for Progress

Report progress from long-running tasks:

from myfy.tasks import task, TaskContext

@task
async def import_users(file_path: str, ctx: TaskContext) -> int:
    users = load_users_from_file(file_path)
    total = len(users)

    for i, user in enumerate(users):
        await create_user(user)
        await ctx.update_progress(
            current=i + 1,
            total=total,
            message=f"Importing user {i + 1}/{total}",
        )

    return total

Check progress from caller:

result = await import_users.get_result(task_id)
if result.progress:
    current, total = result.progress
    print(f"Progress: {current}/{total} - {result.progress_message}")

Running Workers

Start a worker process:

myfy tasks worker

With options:

myfy tasks worker --concurrency 8 --poll-interval 0.5

Workers:

  • Poll the database for pending tasks
  • Execute tasks with full DI injection
  • Handle retries automatically
  • Report progress and results
  • Gracefully shutdown on SIGTERM

Task States

StatusDescription
pendingQueued, waiting for worker
runningBeing executed by worker
completedFinished successfully
failedFailed after all retries
cancelledManually cancelled

Error Handling

Tasks automatically retry on failure:

@task(max_retries=3, retry_on=[APIError])
async def call_api(url: str) -> dict:
    response = await http.get(url)
    if response.status >= 500:
        raise APIError("Server error")  # Will retry
    return response.json()

After all retries fail:

  • Task status becomes failed
  • Error message and traceback are stored
  • Can be retrieved via get_result()

Parameter Classification

TypeBehavior
Primitives (str, int, float, bool)Serialized as task args
Lists, dictsSerialized as task args
TaskContextInjected by worker
Services (other types)DI injected at runtime
@task
async def complex_task(
    order_id: int,           # Serialized (primitive)
    items: list[str],        # Serialized (list)
    ctx: TaskContext,        # Injected (context)
    session: AsyncSession,   # DI injected (service)
    settings: AppSettings,   # DI injected (service)
) -> None:
    ...

Best Practices

  1. Keep tasks idempotent - Safe to retry on failure
  2. Serialize only primitives - Complex objects should be loaded in task
  3. Use TaskContext - Report progress for long tasks
  4. Set appropriate timeouts - Prevent zombie tasks
  5. Monitor worker logs - Watch for repeated failures
  6. Use priorities - Critical tasks get processed first
  7. Handle cleanup - TaskContext supports cleanup callbacks

Install

Download ZIP
Requires askill CLI v1.0+

AI Quality Score

95/100Analyzed 2/12/2026

An exceptionally well-documented skill for the myfy TasksModule, providing comprehensive code examples, configuration tables, and operational guidance.

90
100
85
98
95

Metadata

Licenseunknown
Version-
Updated1/12/2026
Publisherpsincraian

Tags

apidatabase