askill
langgraph-workflows

langgraph-workflowsSafety 95Repository

Understanding workflows vs agents, predetermined vs dynamic patterns, and orchestrator-worker patterns using the Send API

2 stars
1.2k downloads
Updated 2/14/2026

Package Files

Loading files...
SKILL.md

langgraph-workflows (Python)


name: langgraph-workflows description: Understanding workflows vs agents, predetermined vs dynamic patterns, and orchestrator-worker patterns using the Send API

Overview

LangGraph supports both workflows (predetermined paths) and agents (dynamic decision-making). Understanding when to use each pattern is crucial for effective agent design.

Key Distinctions:

  • Workflows: Predetermined code paths, operate in specific order
  • Agents: Dynamic, define their own processes and tool usage
  • Hybrid: Combine deterministic and agentic steps

Decision Table: Workflow vs Agent

CharacteristicWorkflowAgentHybrid
Control FlowFixed, predeterminedDynamic, model-drivenMixed
PredictabilityHighLowMedium
ComplexitySimpleComplexVariable
Use CaseSequential tasksOpen-ended problemsStructured flexibility
ExamplesETL, validationResearch, QAReview approval

Key Patterns

1. Predetermined Workflows

Sequential execution with fixed paths:

  • Data processing pipelines
  • Validation workflows
  • Multi-step transformations

2. Dynamic Agents

Model decides next steps:

  • ReAct agents (reasoning + acting)
  • Tool-calling loops
  • Autonomous task completion

3. Orchestrator-Worker Pattern

One coordinator delegates to multiple workers:

  • Map-reduce operations
  • Parallel processing
  • Multi-agent collaboration

Code Examples

Basic Workflow (Predetermined)

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

class WorkflowState(TypedDict):
    data: str
    validated: bool
    processed: bool

def validate(state: WorkflowState) -> dict:
    """Validate input data."""
    is_valid = len(state["data"]) > 0
    return {"validated": is_valid}

def process(state: WorkflowState) -> dict:
    """Process validated data."""
    return {
        "data": state["data"].upper(),
        "processed": True
    }

# Fixed workflow: validate → process
workflow = (
    StateGraph(WorkflowState)
    .add_node("validate", validate)
    .add_node("process", process)
    .add_edge(START, "validate")
    .add_edge("validate", "process")  # Always go to process
    .add_edge("process", END)
    .compile()
)

result = workflow.invoke({"data": "hello"})
print(result)  # {'data': 'HELLO', 'validated': True, 'processed': True}

Dynamic Agent (Model-Driven)

from langchain.tools import tool
from langchain.chat_models import init_chat_model
from langchain.messages import SystemMessage, ToolMessage
from langgraph.graph import StateGraph, START, END
from typing import Annotated
import operator

@tool
def search(query: str) -> str:
    """Search for information."""
    return f"Results for: {query}"

@tool
def calculate(expression: str) -> str:
    """Calculate a mathematical expression."""
    return str(eval(expression))

class AgentState(TypedDict):
    messages: Annotated[list, operator.add]

model = init_chat_model("claude-sonnet-4-5-20250929")
tools = [search, calculate]
model_with_tools = model.bind_tools(tools)

def agent_node(state: AgentState) -> dict:
    """Agent decides which tool to use (if any)."""
    response = model_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def tool_node(state: AgentState) -> dict:
    """Execute tools chosen by agent."""
    tools_by_name = {tool.name: tool for tool in tools}
    result = []
    
    for tool_call in state["messages"][-1].tool_calls:
        tool = tools_by_name[tool_call["name"]]
        observation = tool.invoke(tool_call["args"])
        result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
    
    return {"messages": result}

def should_continue(state: AgentState):
    """Dynamic: agent decides if it needs more tools."""
    if state["messages"][-1].tool_calls:
        return "tools"
    return END

# Dynamic agent: model decides when to stop
agent = (
    StateGraph(AgentState)
    .add_node("agent", agent_node)
    .add_node("tools", tool_node)
    .add_edge(START, "agent")
    .add_conditional_edges("agent", should_continue)  # Model decides
    .add_edge("tools", "agent")
    .compile()
)

Orchestrator-Worker Pattern

from langgraph.types import Send
from typing import Annotated
import operator

class OrchestratorState(TypedDict):
    tasks: list[str]
    results: Annotated[list, operator.add]

def orchestrator(state: OrchestratorState):
    """Fan out tasks to workers."""
    return [
        Send("worker", {"task": task})
        for task in state["tasks"]
    ]

def worker(state: dict) -> dict:
    """Individual worker processes one task."""
    task = state["task"]
    result = f"Completed: {task}"
    return {"results": [result]}

def synthesize(state: OrchestratorState) -> dict:
    """Combine worker outputs."""
    summary = f"Processed {len(state['results'])} tasks"
    return {"summary": summary}

graph = (
    StateGraph(OrchestratorState)
    .add_node("worker", worker)
    .add_node("synthesize", synthesize)
    .add_conditional_edges(START, orchestrator, ["worker"])
    .add_edge("worker", "synthesize")
    .add_edge("synthesize", END)
    .compile()
)

result = graph.invoke({
    "tasks": ["Task A", "Task B", "Task C"]
})
print(result["summary"])  # "Processed 3 tasks"

Hybrid: Workflow with Agent Step

class HybridState(TypedDict):
    input: str
    validated: bool
    agent_response: str
    finalized: bool

def validate(state: HybridState) -> dict:
    """Fixed validation step."""
    return {"validated": True}

def agent_process(state: HybridState) -> dict:
    """Dynamic: agent decides how to process."""
    # Agent logic here
    response = f"Agent processed: {state['input']}"
    return {"agent_response": response}

def finalize(state: HybridState) -> dict:
    """Fixed finalization step."""
    return {"finalized": True}

# Hybrid: validate → agent → finalize
hybrid = (
    StateGraph(HybridState)
    .add_node("validate", validate)      # Workflow
    .add_node("agent", agent_process)    # Agent
    .add_node("finalize", finalize)      # Workflow
    .add_edge(START, "validate")
    .add_edge("validate", "agent")
    .add_edge("agent", "finalize")
    .add_edge("finalize", END)
    .compile()
)

Map-Reduce Example

class MapReduceState(TypedDict):
    documents: list[str]
    summaries: Annotated[list, operator.add]
    final_summary: str

def map_documents(state: MapReduceState):
    """Map: send each document to a worker."""
    return [
        Send("summarize", {"doc": doc})
        for doc in state["documents"]
    ]

def summarize(state: dict) -> dict:
    """Worker: summarize one document."""
    doc = state["doc"]
    summary = f"Summary of: {doc[:50]}..."
    return {"summaries": [summary]}

def reduce(state: MapReduceState) -> dict:
    """Reduce: combine all summaries."""
    final = " | ".join(state["summaries"])
    return {"final_summary": final}

graph = (
    StateGraph(MapReduceState)
    .add_node("summarize", summarize)
    .add_node("reduce", reduce)
    .add_conditional_edges(START, map_documents, ["summarize"])
    .add_edge("summarize", "reduce")
    .add_edge("reduce", END)
    .compile()
)

result = graph.invoke({
    "documents": ["Doc 1 content...", "Doc 2 content...", "Doc 3 content..."]
})

Parallel Knowledge Base Router

from langgraph.types import Send

class RouterState(TypedDict):
    query: str
    sources: list[str]
    results: Annotated[list, operator.add]

def classify(state: RouterState) -> dict:
    """Determine which sources to query."""
    query = state["query"].lower()
    sources = []
    
    if "code" in query:
        sources.append("github")
    if "doc" in query:
        sources.append("notion")
    if "message" in query:
        sources.append("slack")
    
    return {"sources": sources}

def route_to_sources(state: RouterState):
    """Fan out to relevant sources."""
    return [
        Send(source, {"query": state["query"]})
        for source in state["sources"]
    ]

def query_github(state: dict) -> dict:
    return {"results": [f"GitHub: {state['query']}"]}

def query_notion(state: dict) -> dict:
    return {"results": [f"Notion: {state['query']}"]}

def query_slack(state: dict) -> dict:
    return {"results": [f"Slack: {state['query']}"]}

def synthesize(state: RouterState) -> dict:
    return {"final": " + ".join(state["results"])}

graph = (
    StateGraph(RouterState)
    .add_node("classify", classify)
    .add_node("github", query_github)
    .add_node("notion", query_notion)
    .add_node("slack", query_slack)
    .add_node("synthesize", synthesize)
    .add_edge(START, "classify")
    .add_conditional_edges("classify", route_to_sources)
    .add_edge("github", "synthesize")
    .add_edge("notion", "synthesize")
    .add_edge("slack", "synthesize")
    .add_edge("synthesize", END)
    .compile()
)

Boundaries

What You CAN Configure

✅ Choose workflow vs agent pattern ✅ Mix deterministic and agentic steps ✅ Use Send API for parallel execution ✅ Define custom orchestrator logic ✅ Control worker node behavior ✅ Aggregate results with reducers

What You CANNOT Configure

❌ Change Send API message-passing model ❌ Bypass worker state isolation ❌ Modify parallel execution mechanism ❌ Override reducer behavior at runtime

Gotchas

1. Send Requires Worker State Isolation

# ❌ WRONG - Workers share state, causing conflicts
class State(TypedDict):
    shared_counter: int  # All workers modify same counter!

# ✅ CORRECT - Each worker gets isolated input
def worker(state: dict) -> dict:
    # state is isolated to this worker
    return {"results": [process(state["task"])]}

2. Send Needs Accumulator Reducer

# ❌ WRONG - Last worker overwrites all others
class State(TypedDict):
    results: list  # No reducer!

# ✅ CORRECT - Use Annotated with operator.add
from typing import Annotated
import operator

class State(TypedDict):
    results: Annotated[list, operator.add]  # Accumulates

3. Workflows Can Become Too Rigid

# ❌ ANTI-PATTERN - Overly rigid workflow
# What if validation fails? No recovery path!
.add_edge("validate", "process")  # Always proceeds

# ✅ BETTER - Add conditional logic
def route_after_validate(state):
    if not state["validated"]:
        return "error_handler"
    return "process"

.add_conditional_edges("validate", route_after_validate)

4. Agents Can Become Unpredictable

# ❌ RISKY - Pure agent, no guardrails
# Agent might loop forever or make bad choices

# ✅ BETTER - Hybrid with constraints
def should_continue(state):
    # Add max iterations check
    if state["iterations"] > 10:
        return END
    if state["messages"][-1].tool_calls:
        return "tools"
    return END

Links

Install

Download ZIP
Requires askill CLI v1.0+

AI Quality Score

92/100Analyzed 2/18/2026

Comprehensive technical reference covering LangGraph workflow patterns with excellent code examples, decision framework, and practical gotchas. Well-structured with clear distinctions between workflows, agents, and hybrid approaches. Includes working Python code for each pattern type and addresses common pitfalls. Minor issue with somewhat mismatched tags (github-actions not relevant). Overall high-quality, actionable skill document.

95
90
85
90
90

Metadata

Licenseunknown
Version-
Updated2/14/2026
Publisherchristian-bromann

Tags

apigithubgithub-actionsllm