Akka Typed新特性一览

Hello Scala!

Akka Typed Actor从2.4开始直到2.5可以商用,进而Akka 2.6已经把Akka Typed Actor做为推荐的Actor使用模式。Typed Actor与原先的Untyped Actor最大的区别Actor有类型了,其签名也改成了akka.actor.typed.ActorRef[T]。通过一个简单的示例来看看在Akka Typed环境下怎样使用Actor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sealed trait Command
final case class Hello(message: String, replyTo: ActorRef[Reply]) extends Command
final case class Tell(message: String) extends Command

sealed trait Reply
final case class HelloReply(message: String) extends Reply

def apply(): Behavior[Command] = Behaviors.setup { context =>
Behaviors.receiveMessage {
case Hello(message, replyTo) =>
replyTo ! HelloReply(s"$message, scala!")
Behaviors.same
case Tell(message) =>
context.log.debug("收到消息:{}", message)
Behaviors.same
}
}

Akka Typed不再需要通过类的形式来实现Actor接口定义,而是函数的形式来定义actor。可以看到,定义的actor类型为Behavior[T]形为),通过Behaviors.receiveMessage[T](T => Behavior[T]): Receive[T]函数来处理接收到的消息,而Receive继承了Behavior trait。通过函数签名可以看到,每次接收到消息并对其处理完成后,都必需要返回一个新的形为。

apply(): Behavior[Command]函数签名里的范性参数类型Command限制了这个actor将只接收CommandCommand子类型的消息,编译器将在编译期对传给actor的消息做类型检查,相对于从前的untyped actor可以向actor传入任何类型的消息,这可以限制的减少程序中的bug。特别是在程序规模很大,当你定义了成百上千个消息时。

也因为有类型的actor,在Akka Typed中没有了隐式发送的sender: ActorRef,必需在发送的消息里面包含回复字段,就如Hello消息定义里的replyTo: ActorRef[Reply]字段一样。actor在处理完Hello消息后可以通过它向发送者回复处理结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class HelloScalaSpec extends ScalaTestWithActorTestKit with WordSpecLike {
"HelloScala" should {
"tell" in {
val actorRef = spawn(HelloScala(), "tell")
actorRef ! Tell("Hello")
}

"replyTo" in {
val actorRef = spawn(HelloScala(), "replyTo")
val probe = createTestProbe[Reply]()
actorRef ! Hello("hello", probe.ref)

probe.expectMessageType[HelloReply] should be(HelloReply("hello, scala!"))
}

"ask" in {
import akka.actor.typed.scaladsl.AskPattern._
val actorRef = spawn(HelloScala(), "ask")
val reply = actorRef
.ask[Reply](replyTo => Hello("Hello", replyTo))
.mapTo[HelloReply]
.futureValue
reply.message should be("Hello, scala!")
}
}
}

更复杂的一个示例

上一个示例简单的演示了Akka Typed Actor的功能和基本使用方式,接下来看一个更复杂的示例,将展示Akka Typed更多的特性及功能。

首先是消息定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sealed trait Command
trait ControlCommand extends Command { val clientId: String }
trait ReplyCommand extends Command { val replyTo: ActorRef[Reply] }

final case class Connect(clientId: String, replyTo: ActorRef[Reply]) extends ControlCommand with ReplyCommand
final case class Disconnect(clientId: String, replyTo: ActorRef[Reply]) extends ControlCommand with ReplyCommand
final case class QueryResource(clientId: String, replyTo: ActorRef[Reply]) extends ReplyCommand
final private[typed] case object SessionTimeout extends Command
final private case class ServiceKeyRegistered(registered: Receptionist.Registered) extends Command

sealed trait Reply
final case class Connected(status: Int, clientId: String) extends Reply
final case class Disconnected(status: Int, clientId: String) extends Reply
final case class ResourceQueried(status: Int, clientId: String, resources: Seq[String]) extends Reply
final case class ReplyError(status: Int) extends Reply

上面分别定义了actor可接收的请求消息:Command和返回结果消息:Reply。建议对于需要返回值的消息使用:replyTo来命名收受返回值的actor字段,这里也可以不定义Reply trait来做为统一的返回值类型,可以直接返回结果类型,如:ActorRef[String

这里将定义两个actor,一个做为父actor,一个做为子actor。父actor为:ComplexActor,管理连接客户端和转发消息到子actor,每次有新的客户端连接上来时做以客户端clientId做为名字创建一个子actor;子actor:ComplexClient,保持客户端连接会话,处理消息……

ComplexActor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final class ComplexActor private(context: ActorContext[ComplexActor.Command]) {
import ComplexActor._
private var connects = Map.empty[String, ActorRef[Command]]

def init(): Behavior[Command] = Behaviors.receiveMessage {
case ServiceKeyRegistered(registered) if registered.isForKey(serviceKey) =>
context.log.info("Actor be registered, serviceKey: {}", serviceKey)
receive()
....
}

def receive(): Behavior[Command] =
Behaviors
.receiveMessage[Command] {
case cmd @ Connect(clientId, replyTo) =>
if (connects.contains(clientId)) {
replyTo ! Connected(IntStatus.CONFLICT, clientId)
} else {
val child = context.spawn(
Behaviors
.supervise(ComplexClient(clientId))
.onFailure(SupervisorStrategy.restart),
clientId)
context.watch(child)
connects = connects.updated(clientId, child)
child ! cmd
}
Behaviors.same
....
}
.receiveSignal {
case (_, Terminated(child)) =>
val clientId = child.path.name
connects -= clientId
context.unwatch(child)
Behaviors.same
}
}

ComplexActor在收到Connect消息后将首先判断请求客户端ID(clientId)是否已经连接,若重复连接将直接返回409错误(Connected(IntStatus.CONFLICT, _))。若是一个新连接将调用context.spawn函数在创建一个字actor:ComplexClientspawn函数签名如下:

1
def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty): ActorRef[U]

behavior是要创建的actor,name为子actor的名字,需要保证在同一级内唯一(兄弟之间),props可对actor作一些自定义,如:线程执行器(Dispatcher)、邮箱等。

receiveSignal用于接收系统控制信号消息,经典actor的preRestartpostStop回调函数(将分别做为PreRestartPostStop信号),以及Terminated消息都将做为信号发送到这里。

ComplexClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final class ComplexClient private (
clientId: String,
context: ActorContext[ComplexActor.Command]) {
import ComplexActor._

def active(): Behavior[Command] = Behaviors.receiveMessagePartial {
....
case SessionTimeout =>
context.log.warn("Inactive timeout, stop!")
Behaviors.stopped
}

def init(): Behavior[Command] = Behaviors.receiveMessage {
case Connect(`clientId`, replyTo) =>
replyTo ! Connected(IntStatus.OK, clientId)
context.setReceiveTimeout(120.seconds, SessionTimeout)
active()
case other =>
context.log.warn("Receive invalid command: {}", other)
Behaviors.same
}

ComplexClient定义了两个形为函数,init()active。当客户端连接成功以后会返回active()函数作为actor新的形为来接收之后的消息。这种返回一个新的Behavior函数的形式替代了经典actor里的becomeunbecome函数,它更直观,甚至还可以使用这种方式来实现状态机

context.setReceiveTimeout(120.seconds, SessionTimeout)用来设置两次消息接收之间的超时时间,这里设备为120秒。可以通过方式来实现服务端会话(session)超时判断,当session超时时返回Behaviors.stopped消息来停止actor(自己)。这里需要注意的是context.stop只能用来停止直接子actor,停止actor自身返回stopped形为即可,这与经典actor有着明显的区别。

发现actor

Akka Typed取消了actorSelection函数,不再允许通过actor path路径来查找ActorRef。取而代之的是使用Receptionist机制来注册服务(actor实例)。也就是说,在Akka Typed中,actor默认情况下是不能查找的,只能通过引用(ActorRef[T])来使用,要么actor之间具有父子关系,要么通过消息传递ActorRef[T]……

1
2
3
4
5
6
7
8
9
object ComplexActor {
val serviceKey = ServiceKey[Command]("complex")

def apply(): Behavior[Command] = Behaviors.setup { context =>
val registerAdapter = context.messageAdapter[Receptionist.Registered](value => ServiceKeyRegistered(value))
context.system.receptionist ! Receptionist.Register(serviceKey, context.self, registerAdapter)
new ComplexActor(context).init()
}
}

上面代码通过Receptionist.Register将actor(context.self引用)以serviceKey注册到Actor系统的receptionist表,之后就可以通过serviceKey来发现并获取此actor的引用。

1
2
3
4
5
6
7
8
val actorRef: ActorRef[ComplexActor.Command] = system.receptionist
.ask[Receptionist.Listing](Receptionist.Find(ComplexActor.serviceKey))
.map { listing =>
if (listing.isForKey(serviceKey))
listing.serviceInstances(serviceKey).head
else
throw new IllegalAccessException(s"Actor reference not found: $serviceKey")
}

消息适配器

有时候,需要将不匹配的消息发送给actor,比如:把receptionist服务注册结果 Receptionist.Registered发送给一个actor,我们可以通过将消息包装到一个实现了Command trait的case class来实现。如下面的代码示例:

1
2
val registerAdapter: ActorRef[Receptionist.Registered] =
context.messageAdapter[Receptionist.Registered](value => ServiceKeyRegistered(value))

在使用Receptionist.Register时将registerAdapter作为第3个参数传入,这样服务注册结果就将被包装成ServiceKeyRegistered消息传给actor。

在actor内部处理异步任务

actor内部消息都是串行执行的,在actor内执行异步操作时需要小心。不能在Future的回调函数里直接操作actor内部变量,因为它们很可能在两个不同的线程中。

可以通过context.pipeToSelf将异步结果转换成一个消息传递给actor,这样异步结果将进入actor的邮箱列队,通过正确的消息处理机制来处理。

1
2
3
4
5
6
7
8
9
case QueryResource(_, replyTo) =>
context.pipeToSelf(findExternalResource())(value => InternalQueryResource(value, replyTo))
Behaviors.same

case InternalQueryResource(tryValue, replyTo) =>
replyTo ! tryValue
.map(ResourceQueried(IntStatus.OK, clientId, _))
.getOrElse(ResourceQueried(IntStatus.INTERNAL_ERROR, clientId, Nil))
Behaviors.same

在ActorSystem[_]外部创建actor

Akka Typed开始,ActorSystem[T]也拥有一个泛型参数,在构造ActorSystem时需要传入一个默认Behavior[T],并将其作为经典actor下的user守卫(也就类似拥有akka://system-name/user这个路径的actor),同时ActorSystem[T]actorOf函数也被取消。Akka Typed推荐应用都从传给ActorSystem的默认Behavior[T]开始构建actor树。但有时,也许通过ActorSystem[T]的实例来创建actor是有意义的,可以通过将typed的ActorSystem[T]转换成经典的untyped ActorSystem来实现。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
implicit val timeout = Timeout(2.seconds)
implicit val system: ActorSystem[_] = _ // ....

val spawnActor: ActorRef[SpawnProtocol.Command] = system.toClassic
.actorOf(
PropsAdapter(Behaviors.supervise(SpawnProtocol())
.onFailure(SupervisorStrategy.resume)), "spawn")
.toTyped[SpawnProtocol.Command]

val helloScalaF: Future[ActorRef[HelloScala.Command]] =
spawnActor.ask[ActorRef[HelloScala.Command]](replyTo =>
SpawnProtocol.Spawn(HelloScala(), "sample", Props.empty, replyTo))

val helloScala: ActorRef[HelloScala.Command] = Await.result(helloScalaF, 2.seconds)

也可以将SpawnProtocol()作为ActorSystem[_]的初始Behavior[T]来构造ActorSystem,这样就可以通过system.ask[ActorRef[T]](SpawnProtocol.Spawn(....))来创建在user守卫下的actor了。

小结

本文通过两个例子展示了Akka Typed的特性,它与经典actor的区别还是挺大的。从untyped和typed,actor拥有了类型,这对于大规模actor系统开发可以在编译期发现很多重复,它将强制你在设计actor时首先考虑消息的定义。定义的消息即是actor之间的数据交互协议,消息定义的过程也是业务模式和模块划分的过程。

完整示例代码

分享到