Actix - actor in rust
Actor 创建并发送和接收消息
struct MyActor{
count:usize
}
///
/// Actor
impl Actor for MyActor {
// 每个actor都有一个context
type Context = Context<Self>;
}
#[derive(Message)]
#[rtype(result = "usize")]
struct Ping(usize);
impl Handler<Ping> for MyActor {
type Result = usize;
///
/// 接受Ping类型的消息 然后返回usize
fn handle(&mut self, msg: Ping, ctx: &mut Self::Context) -> Self::Result {
self.count += msg.0;
self.count
}
}
发送,接收处理
#[actix::test]
async fn test1(){
// 开启新的actor并且返回地址也就近似于akka 中的 ActorRef
let addr = MyActor { count:10}.start();
// send 然后handler处理返回
let res = addr.send(Ping(10)).await;
print!("Res : {}\n",res.unwrap());
let id = System::current().id();
print!("id:{} will stop",id);
System::current().stop();
}
生命周期函数
///
/// 生命周期有
/// + Started
/// + Running
/// + Stopping
/// + Stopped
///
/// 重写生命周期函数started,stopped
impl Actor for MineActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("started");
}
fn stopped(&mut self, ctx: &mut Self::Context) {
println!("stopped")
}
}
可Response的Message
///
/// 为了可以返回Responses 我们为Responses实现MessageResponse
impl<A,M> MessageResponse<A,M> for Responses
where A:Actor,
M:Message<Result = Responses> {
fn handle(self, ctx: &mut <A as Actor>::Context, tx: Option<actix::dev::OneshotSender<<M as Message>::Result>>) {
if let Some(tx) = tx {
tx.send(self);
}
}
}
两个Actor互相发的结构
use actix::prelude::*;
use std::time::Duration;
#[derive(Message)]
#[rtype(result = "()")]
struct Ping {
pub id: usize,
}
struct Game {
counter: usize,
name: String,
// 给其他actor发送
recipient: Recipient<Ping>,
}
impl Actor for Game {
type Context = Context<Game>;
}
impl Handler<Ping> for Game {
type Result = ();
fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) {
self.counter += 1;
if self.counter > 10 {
System::current().stop();
} else {
println!("[{0}] Ping received {1}", self.name, msg.id);
ctx.run_later(Duration::new(0, 100), move |act, _| {
// 给recipient发 在这个例子里就是 另一个Game Actor
act.recipient.do_send(Ping { id: msg.id + 1 });
});
}
}
}
示例互啄术
///
/// game 互啄
fn main() {
let mut system = System::new();
let addr = system.block_on(async {
Game::create(|ctx| {
// game1 的 addr
let addr = ctx.address();
// game2
let addr2 = Game {
counter: 0,
name: String::from("Game 2"),
// game1 的 recipient
recipient: addr.recipient(),
}
.start();
// game2 先发送
addr2.do_send(Ping { id: 10 });
Game {
counter: 0,
name: String::from("Game 1"),
recipient: addr2.recipient(),
}
});
});
system.run();
}
Arbiter
let sys = System::new();
let exec = async {
TheActor.start();
};
// 使用Arbiter管理Actor
let arbiter = Arbiter::new();
Arbiter::spawn(&arbiter, exec);
System::current().stop();
sys.run();
SyncArbiter
use actix::prelude::*;
struct MySyncActor;
impl Actor for MySyncActor {
type Context = SyncContext<Self>;
}
// 线程数2则可以有同时两个Actor在处理
let addr = SyncArbiter::start(2, || MySyncActor);
Akka - actor in jvm
Apache Pekko
创建
import akka.actor.AbstractActor;
import akka.actor.Props;
import java.time.Duration;
public class DemoRev extends AbstractActor {
public DemoRev(){
// 设置接收消息的超时时间
getContext().setReceiveTimeout(Duration.ofSeconds(10));
}
/**
* 用于创建ActorRef
**/
public static Props props(){
return Props.create(DemoRev.class, DemoRev::new);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class ,//如果是string类型
r->{
System.out.println("rev :" + r);
getSender().tell("rev done",getSelf());
}).match(Integer.class, // 如果是Interger类型
r->{
getSender().tell("give me more!",getSelf());
}).matchAny(a->{ // 其他
System.out.println("any");
}).build();
}
}
传递消息
ActorSystem system = ActorSystem.create("linux");
// 创建
ActorRef p1 = system.actorOf(DemoRev.props());
ActorRef s1 = system.actorOf(DemoSend.props());
// s1 -> p1
p1.tell("hello",s1);
system.terminate();
Inbox 消息
ActorSystem system = ActorSystem.create("linux");
ActorRef p1 = system.actorOf(DemoRev.props());
final Inbox inbox = Inbox.create(system);
// inbox也是一个actor
inbox.send(p1,"hello");
System.out.println(inbox.receive(Duration.ofSeconds(1)));
system.terminate();
周期性消息
import akka.actor.AbstractActorWithTimers;
import java.time.Duration;
public class DemoTimer extends AbstractActorWithTimers {
private static Object TICK_KEY = "TickKey";
private static final class FirstTick {}
private static final class Tick {}
public DemoTimer(){
// 相当于settimeout
getTimers().startSingleTimer(TICK_KEY,new FirstTick(), Duration.ofMillis(500));
}
@Override
public Receive createReceive() {
return receiveBuilder().match(
FirstTick.class,
message -> {
// 周期执行
getTimers().startPeriodicTimer(TICK_KEY,new Tick(),Duration.ofSeconds(1));
}
).match(Tick.class,message -> {
System.out.println(message);
}).build();
}
}
生命周期
import akka.actor.AbstractActor;
import akka.actor.Props;
public class StartStopActor1 extends AbstractActor {
static Props props() {
return Props.create(StartStopActor1.class, StartStopActor1::new);
}
// 启动hock
@Override
public void preStart() throws Exception {
System.out.printf("start %s \n",getSelf().path().toSerializationFormat());
getContext().actorOf(StartStopActor2.props(),"second");
}
// 停止hock
@Override
public void postStop() throws Exception {
System.out.printf("stop %s \n",getSelf().path().toSerializationFormat());
}
/*
也可以用信号停止
victim.tell(akka.actor.PoisonPill.getInstance(), ActorRef.noSender());
*/
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("stop",s->{
getContext().stop(getSelf());
}).build();
}
}
Receive
// 支持动态改变receive方法
public Receive createReceive() {
return receiveBuilder()
.matchEquals(
"init",
m1 -> {
initializeMe = "Up and running";
getContext()
.become(
receiveBuilder()
.matchEquals(
"U OK?",
m2 -> {
getSender().tell(initializeMe, getSelf());
})
.build());
})
.build();
ask , pipie
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import scala.Tuple2;
import tech.realcpf.sendrev.DemoRev;
import tech.realcpf.sendrev.DemoSend;
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
public class AskDemo {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("sys");
ActorRef actorA = system.actorOf(DemoRev.props());
ActorRef actorB = system.actorOf(DemoRev.props());
ActorRef actorC = system.actorOf(DemoRev.props());
CompletableFuture<Object> future1 =
ask(actorA,"hi A", Duration.ofMillis(1000)).toCompletableFuture();
CompletableFuture<Object> future2 =
ask(actorB,"hi B", Duration.ofMillis(1000)).toCompletableFuture();
CompletableFuture<Tuple2<String,String>> transformed =
CompletableFuture.allOf(future1,future2)
.thenApply(v->{
String x = (String) future1.join();
String s = (String) future2.join();
return new Tuple2(x,s);
});
pipe(transformed,system.dispatcher()).to(actorC);
system.terminate();
}
}