Actix - actor in rust#
Actor 创建并发送和接收消息#
struct MyActor {
count : usize
}
///
/// Actor
impl Actor for MyActor {
// 每个actor都有一个context
type Context = Context < Self >;
}
#[derive( Message )]
#[rtype(result = "usize" )]
struct Ping ( usize );
impl Handler < Ping > for MyActor {
type Result = usize ;
///
/// 接受Ping类型的消息 然后返回usize
fn handle (& mut self , msg : Ping , ctx : & mut Self :: Context ) -> Self :: Result {
self .count += msg . 0 ;
self .count
}
}
Copy
发送,接收处理#
#[actix::test]
async fn test1 (){
// 开启新的actor并且返回地址也就近似于akka 中的 ActorRef
let addr = MyActor { count : 10 }. start ();
// send 然后handler处理返回
let res = addr . send ( Ping ( 10 )). await ;
print! ( "Res : {} \n " , res . unwrap ());
let id = System :: current (). id ();
print! ( "id:{} will stop" , id );
System :: current (). stop ();
}
Copy
生命周期函数#
///
/// 生命周期有
/// + Started
/// + Running
/// + Stopping
/// + Stopped
///
/// 重写生命周期函数started,stopped
impl Actor for MineActor {
type Context = Context < Self >;
fn started (& mut self , ctx : & mut Self :: Context ) {
println! ( "started" );
}
fn stopped (& mut self , ctx : & mut Self :: Context ) {
println! ( "stopped" )
}
}
Copy
可Response的Message#
///
/// 为了可以返回Responses 我们为Responses实现MessageResponse
impl < A , M > MessageResponse < A , M > for Responses
where A : Actor ,
M : Message < Result = Responses > {
fn handle ( self , ctx : & mut < A as Actor >:: Context , tx : Option < actix :: dev :: OneshotSender << M as Message >:: Result >>) {
if let Some ( tx ) = tx {
tx . send ( self );
}
}
}
Copy
两个Actor互相发的结构#
use actix :: prelude ::*;
use std :: time :: Duration ;
#[derive( Message )]
#[rtype(result = "()" )]
struct Ping {
pub id : usize ,
}
struct Game {
counter : usize ,
name : String ,
// 给其他actor发送
recipient : Recipient < Ping >,
}
impl Actor for Game {
type Context = Context < Game >;
}
impl Handler < Ping > for Game {
type Result = ();
fn handle (& mut self , msg : Ping , ctx : & mut Context < Self >) {
self .counter += 1 ;
if self .counter > 10 {
System :: current (). stop ();
} else {
println! ( "[{0}] Ping received {1}" , self .name, msg .id);
ctx . run_later ( Duration :: new ( 0 , 100 ), move | act , _ | {
// 给recipient发 在这个例子里就是 另一个Game Actor
act .recipient. do_send ( Ping { id : msg .id + 1 });
});
}
}
}
Copy
示例互啄术#
///
/// game 互啄
fn main () {
let mut system = System :: new ();
let addr = system . block_on ( async {
Game :: create ( | ctx | {
// game1 的 addr
let addr = ctx . address ();
// game2
let addr2 = Game {
counter : 0 ,
name : String :: from ( "Game 2" ),
// game1 的 recipient
recipient : addr . recipient (),
}
. start ();
// game2 先发送
addr2 . do_send ( Ping { id : 10 });
Game {
counter : 0 ,
name : String :: from ( "Game 1" ),
recipient : addr2 . recipient (),
}
});
});
system . run ();
}
Copy
Arbiter#
let sys = System :: new ();
let exec = async {
TheActor . start ();
};
// 使用Arbiter管理Actor
let arbiter = Arbiter :: new ();
Arbiter :: spawn (& arbiter , exec );
System :: current (). stop ();
sys . run ();
Copy
SyncArbiter#
use actix :: prelude ::*;
struct MySyncActor ;
impl Actor for MySyncActor {
type Context = SyncContext < Self >;
}
// 线程数2则可以有同时两个Actor在处理
let addr = SyncArbiter :: start ( 2 , || MySyncActor );
Copy
Akka - actor in jvm#
Apache Pekko
import akka.actor.AbstractActor ;
import akka.actor.Props ;
import java.time.Duration ;
public class DemoRev extends AbstractActor {
public DemoRev (){
// 设置接收消息的超时时间
getContext (). setReceiveTimeout ( Duration . ofSeconds ( 10 ));
}
/**
* 用于创建ActorRef
**/
public static Props props (){
return Props . create ( DemoRev . class , DemoRev ::new );
}
@ Override
public Receive createReceive () {
return receiveBuilder ()
. match ( String . class , //如果是string类型
r -> {
System . out . println ( "rev :" + r);
getSender (). tell ( "rev done" , getSelf ());
}). match ( Integer . class , // 如果是Interger类型
r -> {
getSender (). tell ( "give me more!" , getSelf ());
}). matchAny (a -> { // 其他
System . out . println ( "any" );
}). build ();
}
}
Copy
传递消息#
ActorSystem system = ActorSystem . create ( "linux" );
// 创建
ActorRef p1 = system . actorOf ( DemoRev . props ());
ActorRef s1 = system . actorOf ( DemoSend . props ());
// s1 -> p1
p1 . tell ( "hello" ,s1);
system . terminate ();
Copy
Inbox 消息#
ActorSystem system = ActorSystem . create ( "linux" );
ActorRef p1 = system . actorOf ( DemoRev . props ());
final Inbox inbox = Inbox . create (system);
// inbox也是一个actor
inbox . send (p1, "hello" );
System . out . println ( inbox . receive ( Duration . ofSeconds ( 1 )));
system . terminate ();
Copy
周期性消息#
import akka.actor.AbstractActorWithTimers ;
import java.time.Duration ;
public class DemoTimer extends AbstractActorWithTimers {
private static Object TICK_KEY = "TickKey" ;
private static final class FirstTick {}
private static final class Tick {}
public DemoTimer (){
// 相当于settimeout
getTimers (). startSingleTimer (TICK_KEY, new FirstTick (), Duration . ofMillis ( 500 ));
}
@ Override
public Receive createReceive () {
return receiveBuilder (). match (
FirstTick . class ,
message -> {
// 周期执行
getTimers (). startPeriodicTimer (TICK_KEY, new Tick (), Duration . ofSeconds ( 1 ));
}
). match ( Tick . class ,message -> {
System . out . println (message);
}). build ();
}
}
Copy
生命周期#
import akka.actor.AbstractActor ;
import akka.actor.Props ;
public class StartStopActor1 extends AbstractActor {
static Props props () {
return Props . create ( StartStopActor1 . class , StartStopActor1 ::new );
}
// 启动hock
@ Override
public void preStart () throws Exception {
System . out . printf ( "start %s \n " , getSelf (). path (). toSerializationFormat ());
getContext (). actorOf ( StartStopActor2 . props (), "second" );
}
// 停止hock
@ Override
public void postStop () throws Exception {
System . out . printf ( "stop %s \n " , getSelf (). path (). toSerializationFormat ());
}
/*
也可以用信号停止
victim.tell(akka.actor.PoisonPill.getInstance(), ActorRef.noSender());
*/
@ Override
public Receive createReceive () {
return receiveBuilder ()
. matchEquals ( "stop" ,s -> {
getContext (). stop ( getSelf ());
}). build ();
}
}
Copy
Receive#
// 支持动态改变receive方法
public Receive createReceive () {
return receiveBuilder ()
. matchEquals (
"init" ,
m1 -> {
initializeMe = "Up and running" ;
getContext ()
. become (
receiveBuilder ()
. matchEquals (
"U OK?" ,
m2 -> {
getSender (). tell (initializeMe, getSelf ());
})
. build ());
})
. build ();
Copy
ask , pipie#
import akka.actor.ActorRef ;
import akka.actor.ActorSystem ;
import scala.Tuple2 ;
import tech.realcpf.sendrev.DemoRev ;
import tech.realcpf.sendrev.DemoSend ;
import static akka.pattern.Patterns.ask ;
import static akka.pattern.Patterns.pipe ;
import java.time.Duration ;
import java.util.Objects ;
import java.util.concurrent.CompletableFuture ;
public class AskDemo {
public static void main ( String [] args ) {
ActorSystem system = ActorSystem . create ( "sys" );
ActorRef actorA = system . actorOf ( DemoRev . props ());
ActorRef actorB = system . actorOf ( DemoRev . props ());
ActorRef actorC = system . actorOf ( DemoRev . props ());
CompletableFuture < Object > future1 =
ask (actorA, "hi A" , Duration . ofMillis ( 1000 )). toCompletableFuture ();
CompletableFuture < Object > future2 =
ask (actorB, "hi B" , Duration . ofMillis ( 1000 )). toCompletableFuture ();
CompletableFuture < Tuple2 < String , String >> transformed =
CompletableFuture . allOf (future1,future2)
. thenApply (v -> {
String x = (String) future1 . join ();
String s = (String) future2 . join ();
return new Tuple2 (x,s);
});
pipe (transformed, system . dispatcher ()). to (actorC);
system . terminate ();
}
}
Copy