Pipeline Design
Design complete post-training workflows integrating sdg_hub, training_hub, its_hub, and reward_hub.
Pipeline Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ Post-Training Pipeline │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ sdg_hub │───▶│ training_hub │───▶│ its_hub + reward_hub │ │
│ │ │ │ │ │ │ │
│ │ Generate │ │ Fine-tune │ │ Inference-time scaling │ │
│ │ training │ │ model │ │ with reward selection │ │
│ │ data │ │ │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Common Pipeline Patterns
Pattern 1: SDG → Training → Evaluation
Generate synthetic data, train a model, evaluate with reward models.
from datasets import load_dataset
from sdg_hub import FlowRegistry, Flow
from training_hub import sft
from reward_hub import AutoRM
# Step 1: Generate synthetic training data
seed_data = load_dataset("json", data_files="seed.jsonl", split="train")
flow = Flow.from_yaml(FlowRegistry.get_flow_path("math-cot"))
flow.set_model_config(model="gpt-4", api_base="...", api_key="...")
synthetic_data = flow.generate(seed_data)
synthetic_data.to_json("synthetic_train.jsonl")
# Step 2: Train the model
sft(
model_path="meta-llama/Llama-3.1-8B",
data_path="synthetic_train.jsonl",
ckpt_output_dir="./checkpoints",
effective_batch_size=128,
num_epochs=3,
)
# Step 3: Evaluate with reward model
rm = AutoRM.load("Qwen/Qwen2.5-Math-PRM-7B", load_method="vllm")
eval_data = load_dataset("json", data_files="eval.jsonl", split="train")
scores = []
for example in eval_data:
score = rm.score([
{"role": "user", "content": example["prompt"]},
{"role": "assistant", "content": example["model_output"]},
])
scores.append(score)
print(f"Mean score: {sum(scores) / len(scores)}")
Pattern 2: Iterative Refinement
Use ITS to improve outputs, then train on the improved data.
from its_hub.algorithms import BestOfN
from its_hub.lms import OpenAICompatibleLanguageModel
from its_hub.integration.reward_hub import RewardHubORM
from reward_hub import AutoRM
from training_hub import sft
# Step 1: Load models
lm = OpenAICompatibleLanguageModel(
model="meta-llama/Llama-3.1-8B-Instruct",
base_url="http://localhost:8000/v1",
)
rm = AutoRM.load("internlm/internlm2-7b-reward", load_method="vllm")
orm = RewardHubORM(rm)
# Step 2: Generate improved responses using ITS
alg = BestOfN(orm=orm)
improved_data = []
for example in train_prompts:
result = alg.infer(lm, example["prompt"], budget=16)
improved_data.append({
"messages": [
{"role": "user", "content": example["prompt"]},
{"role": "assistant", "content": result.answer},
]
})
# Step 3: Save and train on improved data
save_jsonl(improved_data, "improved_train.jsonl")
sft(
model_path="meta-llama/Llama-3.1-8B",
data_path="improved_train.jsonl",
ckpt_output_dir="./checkpoints",
)
Pattern 3: Full RLHF-style Pipeline
SDG for data, train base model, create reward model, OSFT with rewards.
from sdg_hub import Flow, FlowRegistry
from training_hub import sft, osft
from reward_hub import AutoRM, DrSow
# Phase 1: Generate diverse training data
flow = Flow.from_yaml(FlowRegistry.get_flow_path("diverse-instructions"))
synthetic_data = flow.generate(seed_data)
synthetic_data.to_json("sft_data.jsonl")
# Phase 2: Initial SFT
sft(
model_path="meta-llama/Llama-3.1-8B",
data_path="sft_data.jsonl",
ckpt_output_dir="./checkpoints/sft",
)
# Phase 3: Create preference data & train reward model
# (Collect human preferences or use LLM-as-judge)
preference_rm = DrSow(
base_model="meta-llama/Llama-3.1-8B",
preference_data="./preferences.jsonl",
)
preference_rm.train()
# Phase 4: OSFT with reward guidance
osft(
model_path="./checkpoints/sft",
data_path="sft_data.jsonl",
ckpt_output_dir="./checkpoints/osft",
unfreeze_rank_ratio=0.3,
reward_model=preference_rm,
)
Pattern 4: Math Reasoning Pipeline
Specialized pipeline for math problem solving.
from sdg_hub import Flow, FlowRegistry
from training_hub import sft
from its_hub.algorithms import BeamSearch
from its_hub.integration.reward_hub import RewardHubPRM
from reward_hub import AutoRM
# Generate math training data with chain-of-thought
flow = Flow.from_yaml(FlowRegistry.get_flow_path("math-cot"))
math_data = flow.generate(math_problems)
math_data.to_json("math_train.jsonl")
# Train model
sft(
model_path="meta-llama/Llama-3.1-8B",
data_path="math_train.jsonl",
ckpt_output_dir="./checkpoints/math",
max_seq_length=4096, # Math needs longer context
)
# Deploy with PRM-guided beam search
prm = AutoRM.load("Qwen/Qwen2.5-Math-PRM-7B", load_method="vllm")
prm_wrapper = RewardHubPRM(prm)
beam_search = BeamSearch(
prm=prm_wrapper,
beam_width=8,
max_depth=10,
)
# Use for inference
result = beam_search.infer(lm, "Solve: x^2 - 5x + 6 = 0", budget=32)
Data Flow Between Libraries
SDG → Training
# SDG output format
synthetic_data.to_json("train.jsonl")
# Format: {"messages": [{"role": "user", ...}, {"role": "assistant", ...}]}
# Training expects this format
sft(data_path="train.jsonl")
Training → ITS
# Training produces checkpoint
sft(ckpt_output_dir="./checkpoints")
# Load checkpoint for ITS
from its_hub.lms import HuggingFaceLanguageModel
lm = HuggingFaceLanguageModel(model_path="./checkpoints/final")
ITS ↔ Reward
# Reward models integrate via wrappers
from its_hub.integration.reward_hub import RewardHubORM, RewardHubPRM
orm = RewardHubORM(AutoRM.load("..."))
prm = RewardHubPRM(AutoRM.load("..."))
# Use in algorithms
alg = BestOfN(orm=orm)
alg = BeamSearch(prm=prm)
Pipeline Configuration Files
pipeline.yaml
name: math-reasoning-pipeline
version: 1.0
stages:
- name: generate_data
type: sdg
config:
flow: math-cot
model: gpt-4
output: ./data/synthetic.jsonl
- name: train_base
type: training
depends_on: [generate_data]
config:
method: sft
model: meta-llama/Llama-3.1-8B
data: ./data/synthetic.jsonl
output: ./checkpoints/base
- name: train_refined
type: training
depends_on: [train_base]
config:
method: osft
model: ./checkpoints/base
data: ./data/synthetic.jsonl
output: ./checkpoints/refined
unfreeze_rank_ratio: 0.3
- name: deploy
type: inference
depends_on: [train_refined]
config:
model: ./checkpoints/refined
algorithm: beam_search
reward_model: Qwen/Qwen2.5-Math-PRM-7B
Best Practices
- Version your data: Track which synthetic data trained which model
- Checkpoint frequently: Save model states at each pipeline stage
- Validate between stages: Check data quality before training
- Use consistent formats: Stick to messages format throughout
- Monitor resource usage: Each stage has different GPU/memory needs
Debugging Pipelines
# Check data compatibility
from training_hub.utils import validate_data
errors = validate_data("synthetic.jsonl", model_path="meta-llama/Llama-3.1-8B")
# Verify model loads correctly
from its_hub.lms import HuggingFaceLanguageModel
lm = HuggingFaceLanguageModel(model_path="./checkpoints/final")
output = lm.generate("Test prompt", max_tokens=50)
# Test reward model
from reward_hub import AutoRM
rm = AutoRM.load("...", load_method="vllm")
score = rm.score([{"role": "user", "content": "test"}, {"role": "assistant", "content": "test"}])
Related Skills
/sdg-run-flow- Execute SDG flows/training-configure- Configure training/its-setup- Set up inference-time scaling/reward-configure- Configure reward models
