Skip to content

关于actor那些不得不说的故事

Published: at 18:16

Actix - actor in rust

Actor 创建并发送和接收消息


struct MyActor{
    count:usize
}

///
/// Actor
impl Actor for MyActor {
    // 每个actor都有一个context
    type Context = Context<Self>;
}

#[derive(Message)]
#[rtype(result = "usize")]
struct Ping(usize);



impl Handler<Ping> for MyActor {
    type Result = usize;

    ///
    /// 接受Ping类型的消息 然后返回usize
    fn handle(&mut self, msg: Ping, ctx: &mut Self::Context) -> Self::Result {
        self.count += msg.0;
        self.count
    }
}

发送,接收处理


#[actix::test]
async fn test1(){
    // 开启新的actor并且返回地址也就近似于akka 中的 ActorRef
    let addr = MyActor { count:10}.start();

    // send 然后handler处理返回
    let res = addr.send(Ping(10)).await;

    print!("Res : {}\n",res.unwrap());

    let id = System::current().id();

    print!("id:{} will stop",id);

    System::current().stop();
}

生命周期函数

///
/// 生命周期有
/// + Started
/// + Running
/// + Stopping
/// + Stopped
///
/// 重写生命周期函数started,stopped
impl Actor for MineActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        println!("started");
    }
    fn stopped(&mut self, ctx: &mut Self::Context) {
        println!("stopped")
    }
}


可Response的Message


///
/// 为了可以返回Responses 我们为Responses实现MessageResponse
impl<A,M> MessageResponse<A,M> for Responses
where A:Actor,
        M:Message<Result = Responses> {
    fn handle(self, ctx: &mut <A as Actor>::Context, tx: Option<actix::dev::OneshotSender<<M as Message>::Result>>) {
        if let Some(tx) = tx {
            tx.send(self);
        }
    }
}

两个Actor互相发的结构


use actix::prelude::*;
use std::time::Duration;

#[derive(Message)]
#[rtype(result = "()")]
struct Ping {
    pub id: usize,
}

struct Game {
    counter: usize,
    name: String,
    // 给其他actor发送
    recipient: Recipient<Ping>,
}

impl Actor for Game {
    type Context = Context<Game>;
}

impl Handler<Ping> for Game {
    type Result = ();

    fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) {
        self.counter += 1;

        if self.counter > 10 {
            System::current().stop();
        } else {
            println!("[{0}] Ping received {1}", self.name, msg.id);

            ctx.run_later(Duration::new(0, 100), move |act, _| {
                // 给recipient发 在这个例子里就是 另一个Game Actor
                act.recipient.do_send(Ping { id: msg.id + 1 });
            });
        }
    }
}

示例互啄术


///
/// game 互啄
fn main() {
    let mut system = System::new();
    let addr = system.block_on(async {
        Game::create(|ctx| {
            // game1 的 addr
            let addr = ctx.address();

            // game2
            let addr2 = Game {
                counter: 0,
                name: String::from("Game 2"),
                // game1 的 recipient
                recipient: addr.recipient(),
            }
            .start();

            // game2 先发送
            addr2.do_send(Ping { id: 10 });

            Game {
                counter: 0,
                name: String::from("Game 1"),
                recipient: addr2.recipient(),
            }
        });
    });

    system.run();
}
Arbiter
    let sys = System::new();

    let exec = async {
        TheActor.start();
    };

    // 使用Arbiter管理Actor
    let arbiter = Arbiter::new();

    Arbiter::spawn(&arbiter, exec);


    System::current().stop();

    sys.run();
SyncArbiter

use actix::prelude::*;

struct MySyncActor;

impl Actor for MySyncActor {
    type Context = SyncContext<Self>;
}
// 线程数2则可以有同时两个Actor在处理
let addr = SyncArbiter::start(2, || MySyncActor);

Akka - actor in jvm

Apache Pekko

创建
import akka.actor.AbstractActor;
import akka.actor.Props;

import java.time.Duration;

public class DemoRev extends AbstractActor {
    public DemoRev(){
        // 设置接收消息的超时时间
        getContext().setReceiveTimeout(Duration.ofSeconds(10));
    }
    /**
     * 用于创建ActorRef
    **/
    public static Props props(){
        return Props.create(DemoRev.class, DemoRev::new);
    }
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class ,//如果是string类型
                r->{
                    System.out.println("rev :" + r);
                    getSender().tell("rev done",getSelf());
                }).match(Integer.class, // 如果是Interger类型
                r->{
                    getSender().tell("give me more!",getSelf());
                }).matchAny(a->{ // 其他
                    System.out.println("any");
                }).build();
    }
}

传递消息
ActorSystem system = ActorSystem.create("linux");
// 创建
ActorRef p1 = system.actorOf(DemoRev.props());
ActorRef s1 = system.actorOf(DemoSend.props());
// s1 -> p1
p1.tell("hello",s1);
system.terminate();

Inbox 消息


ActorSystem system = ActorSystem.create("linux");
ActorRef p1 = system.actorOf(DemoRev.props());

final Inbox inbox = Inbox.create(system);
// inbox也是一个actor
inbox.send(p1,"hello");

System.out.println(inbox.receive(Duration.ofSeconds(1)));
system.terminate();

周期性消息

import akka.actor.AbstractActorWithTimers;

import java.time.Duration;

public class DemoTimer extends AbstractActorWithTimers {
    private static Object TICK_KEY = "TickKey";

    private static final class FirstTick {}

    private static final class Tick {}

    public DemoTimer(){
        // 相当于settimeout
        getTimers().startSingleTimer(TICK_KEY,new FirstTick(), Duration.ofMillis(500));
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(
                FirstTick.class,
                message -> {
                    // 周期执行
                    getTimers().startPeriodicTimer(TICK_KEY,new Tick(),Duration.ofSeconds(1));
                }
        ).match(Tick.class,message -> {
            System.out.println(message);
        }).build();
    }
}


生命周期

import akka.actor.AbstractActor;
import akka.actor.Props;

public class StartStopActor1 extends AbstractActor {
    static Props props() {
        return Props.create(StartStopActor1.class, StartStopActor1::new);
    }
    // 启动hock
    @Override
    public void preStart() throws Exception {
        System.out.printf("start %s \n",getSelf().path().toSerializationFormat());
        getContext().actorOf(StartStopActor2.props(),"second");
    }
    // 停止hock
    @Override
    public void postStop() throws Exception {
        System.out.printf("stop %s \n",getSelf().path().toSerializationFormat());
    }

    /*

    也可以用信号停止
     victim.tell(akka.actor.PoisonPill.getInstance(), ActorRef.noSender());
    */

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .matchEquals("stop",s->{
                    getContext().stop(getSelf());
                }).build();
    }
}

Receive

// 支持动态改变receive方法
public Receive createReceive() {
return receiveBuilder()
    .matchEquals(
        "init",
        m1 -> {
            initializeMe = "Up and running";
            getContext()
                .become(
                    receiveBuilder()
                        .matchEquals(
                            "U OK?",
                            m2 -> {
                            getSender().tell(initializeMe, getSelf());
                            })
                        .build());
        })
    .build();

ask , pipie



import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import scala.Tuple2;
import tech.realcpf.sendrev.DemoRev;
import tech.realcpf.sendrev.DemoSend;

import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class AskDemo {
    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("sys");
        ActorRef actorA = system.actorOf(DemoRev.props());
        ActorRef actorB = system.actorOf(DemoRev.props());
        ActorRef actorC = system.actorOf(DemoRev.props());
        CompletableFuture<Object> future1 =
                ask(actorA,"hi A", Duration.ofMillis(1000)).toCompletableFuture();
        CompletableFuture<Object> future2 =
                ask(actorB,"hi B", Duration.ofMillis(1000)).toCompletableFuture();

        CompletableFuture<Tuple2<String,String>> transformed =
                CompletableFuture.allOf(future1,future2)
                        .thenApply(v->{
                           String x = (String) future1.join();
                           String s = (String) future2.join();
                           return new Tuple2(x,s);
                        });

        pipe(transformed,system.dispatcher()).to(actorC);

        system.terminate();
    }
}