askill
rust-actor

rust-actorSafety 90Repository

Actor 模型专家。处理 Actor 死锁, 消息传递, 状态管理, supervision, 容错, Actix, Erlang 风格并发

7 stars
1.2k downloads
Updated 2/8/2026

Package Files

Loading files...
SKILL.md

Actor 模型

核心问题

如何在并发系统中避免死锁并实现可靠的进程间通信?

Actor 模型通过消息传递和状态隔离来简化并发。


Actor vs 线程模型

特性线程模型Actor 模型
状态共享共享内存 + 锁无共享,消息传递
死锁风险高(锁顺序)低(消息队列)
扩展性受限于线程数可扩展到百万级
故障处理手动Supervision 树
调试难度难(竞态条件)相对容易(消息序列)

Actor 核心结构

// Actor 基础 trait
trait Actor: Send + 'static {
    type Message: Send + 'static;
    type Error: std::error::Error;
    
    fn receive(&mut self, ctx: &mut Context<Self::Message>, msg: Self::Message);
}

// Actor 上下文
struct Context<A: Actor> {
    mailbox: Receiver<A::Message>,
    sender: Sender<A::Message>,
    state: ActorState,
    supervisor: Option<SupervisorAddr>,
}

enum ActorState {
    Starting,
    Running,
    Restarting,
    Stopping,
    Stopped,
}

消息传递

// 同步消息
fn sync_request<A: Actor, R>(
    actor: &Addr<A>,
    msg: A::Message,
    timeout: Duration,
) -> Result<R, A::Error> {
    let (tx, rx) = channel();
    let request = Request {
        payload: msg,
        response: tx,
    };
    
    actor.send(request)?;
    
    rx.recv_timeout(timeout)?
}

// 异步消息
fn async_send<A: Actor>(actor: &Addr<A>, msg: A::Message) {
    actor.send(msg);
}

// 消息信封
enum Envelope<A: Actor> {
    Async(A::Message),
    Request {
        payload: A::Message,
        response: Sender<Result<A::Response, A::Error>>,
    },
    Signal(ActorSignal),
}

死锁预防

// 1. 避免循环等待:使用唯一消息顺序
enum GlobalMessage {
    // 按固定顺序排列
    UserMsg(UserMessage),
    SystemMsg(SystemMessage),
    InternalMsg(InternalMessage),
}

// 2. 超时机制
fn send_with_timeout<A: Actor, M: Send + 'static>(
    addr: &Addr<A>,
    msg: M,
    timeout: Duration,
) -> Result<(), SendError<M>> {
    let (tx, rx) = channel();
    
    addr.send(AsyncWrapper { msg, reply_to: tx });
    
    rx.recv_timeout(timeout)
        .map(|_| ())
        .map_err(|_| SendError::Timeout)
}

// 3. 限制邮箱大小(背压)
struct BoundedMailbox<A: Actor> {
    receiver: Receiver<A::Message>,
    sender: Sender<A::Message>,
    capacity: usize,
}

impl<A: Actor> Mailbox for BoundedMailbox<A> {
    fn capacity(&self) -> usize {
        self.capacity
    }
}

Supervision 树

// Supervision 策略
enum SupervisionStrategy {
    OneForOne,    // 只重启出错的子 actor
    AllForOne,    // 一个出错,全部重启
    RestForOne,   // 出错的和之后的重启
}

struct Supervisor {
    children: HashMap<ChildId, Child>,
    strategy: SupervisionStrategy,
    max_restarts: u32,
    window: Duration,
}

impl Supervisor {
    fn handle_child_error(&mut self, child_id: ChildId, error: &dyn std::error::Error) {
        let child = self.children.get_mut(&child_id).unwrap();
        child.restart_count += 1;
        
        if self.should_restart(child_id) {
            self.restart_child(child_id);
        } else {
            self.stop_child(child_id);
        }
    }
    
    fn should_restart(&self, child_id: ChildId) -> bool {
        let child = &self.children[&child_id];
        child.restart_count <= self.max_restarts
    }
}

状态管理

// Actor 内部状态
struct UserActor {
    id: UserId,
    session: Option<Session>,
    message_history: Vec<Message>,
    followers: HashSet<UserId>,
}

impl Actor for UserActor {
    type Message = UserMessage;
    
    fn receive(&mut self, ctx: &mut Context<Self::Message>, msg: Self::Message) {
        match msg {
            UserMessage::Login(session) => {
                self.session = Some(session);
            }
            UserMessage::Post(content) => {
                if let Some(session) = &self.session {
                    self.message_history.push(Message {
                        content,
                        timestamp: Utc::now(),
                        user: session.user_id,
                    });
                }
            }
            UserMessage::Follow(target_id) => {
                self.followers.insert(target_id);
            }
        }
    }
}

// 状态快照
impl UserActor {
    fn snapshot(&self) -> UserSnapshot {
        UserSnapshot {
            id: self.id,
            message_count: self.message_history.len(),
            followers_count: self.followers.len(),
            is_online: self.session.is_some(),
        }
    }
}

Actor 生命周期

// 生命周期事件
enum LifecycleEvent {
    PreStart,
    PostStart,
    PreRestart,
    PostRestart,
    PostStop,
}

trait LifecycleHandler: Actor {
    fn pre_start(&mut self, ctx: &mut Context<Self::Message>) {
        // 初始化资源
    }
    
    fn post_start(&mut self, ctx: &mut Context<Self::Message>) {
        // 启动定时器、连接等
    }
    
    fn pre_restart(&mut self, ctx: &mut Context<Self::Message>, error: &dyn std::error::Error) {
        // 清理资源
    }
    
    fn post_stop(&mut self) {
        // 保存状态、关闭连接
    }
}

Actix 示例

// Actix Web Actor
use actix::{Actor, Handler, Message, Context};

struct MyActor {
    counter: usize,
}

impl Actor for MyActor {
    type Context = Context<Self>;
    
    fn started(&mut self, _ctx: &mut Self::Context) {
        println!("Actor started");
    }
}

#[derive(Message)]
#[rtype(result = "usize")]
struct Increment;

impl Handler<Increment> for MyActor {
    type Result = usize;
    
    fn handle(&mut self, msg: Increment, _ctx: &mut Self::Context) -> Self::Result {
        self.counter += 1;
        self.counter
    }
}

// 使用
let actor = MyActor { counter: 0 }.start();
let result = actor.send(Increment).await?;

常见问题

问题原因解决
死锁循环消息等待超时、避免循环依赖
消息积压消费者慢背压、限流
内存泄漏Actor 未停止生命周期管理
状态不一致并发消息顺序处理、单线程

与其他技能关联

rust-actor
    │
    ├─► rust-concurrency → 并发模型
    ├─► rust-async → 异步消息
    └─► rust-error → 错误传播

Install

Download ZIP
Requires askill CLI v1.0+

AI Quality Score

94/100Analyzed 2/11/2026

A high-quality technical reference for implementing the Actor model in Rust. It covers core concepts, implementation patterns (supervision, lifecycle), and practical examples using Actix, while addressing critical concurrency issues like deadlocks and backpressure.

90
95
95
95
90

Metadata

Licenseunknown
Version-
Updated2/8/2026
PublisherNeverSight

Tags

No tags yet.