Akka Typed Actor
Hello Scala!
Akka Typed Actor从2.4开始直到2.5可以商用,进而Akka 2.6已经把Akka Typed Actor做为推荐的Actor使用模式。Typed Actor与原先的Untyped Actor最大的区别Actor有类型了,其签名也改成了akka.actor.typed.ActorRef[T]
。通过一个简单的示例来看看在Akka Typed环境下怎样使用Actor。
sealed trait Command
final case class Hello(message: String, replyTo: ActorRef[Reply]) extends Command
final case class Tell(message: String) extends Command
sealed trait Reply
final case class HelloReply(message: String) extends Reply
def apply(): Behavior[Command] = Behaviors.setup { context =>
Behaviors.receiveMessage {
case Hello(message, replyTo) =>
replyTo ! HelloReply(s"$message, scala!")
Behaviors.same
case Tell(message) =>
context.log.debug("收到消息:{}", message)
Behaviors.same
}
}
Akka Typed不再需要通过类的形式来实现Actor
接口定义,而是函数的形式来定义actor。可以看到,定义的actor类型为Behavior[T]
(**形为**),通过Behaviors.receiveMessage[T](T => Behavior[T]): Receive[T]
函数来处理接收到的消息,而Receive
继承了Behavior
trait。通过函数签名可以看到,每次接收到消息并对其处理完成后,都必需要返回一个新的形为。
apply(): Behavior[Command]
函数签名里的范性参数类型Command
限制了这个actor将只接收Command
或Command
子类型的消息,编译器将在编译期对传给actor的消息做类型检查,相对于从前的untyped actor可以向actor传入任何类型的消息,这可以限制的减少程序中的bug。特别是在程序规模很大,当你定义了成百上千个消息时。
也因为有类型的actor,在Akka Typed中没有了隐式发送的sender: ActorRef
,必需在发送的消息里面包含回复字段,就如Hello
消息定义里的replyTo: ActorRef[Reply]
字段一样。actor在处理完Hello
消息后可以通过它向发送者回复处理结果。
"tell" in {
val actorRef = spawn(HelloScala(), "tell")
actorRef ! Tell("Hello")
}
"replyTo" in {
val actorRef = spawn(HelloScala(), "replyTo")
val probe = createTestProbe[Reply]()
actorRef ! Hello("hello", probe.ref)
probe.expectMessageType[HelloReply] should be(HelloReply("hello, scala!"))
}
"ask" in {
import akka.actor.typed.scaladsl.AskPattern._
val actorRef = spawn(HelloScala(), "ask")
val reply =
actorRef
.ask[Reply](replyTo => Hello("Hello", replyTo))
.mapTo[HelloReply]
.futureValue
reply.message should be("Hello, scala!")
}
更复杂的一个示例
上一个示例简单的演示了Akka Typed Actor的功能和基本使用方式,接下来看一个更复杂的示例,将展示Akka Typed更多的特性及功能。
首先是消息定义:
sealed trait Command
trait ControlCommand extends Command { val clientId: String }
trait ReplyCommand extends Command { val replyTo: ActorRef[Reply] }
final case class Connect(clientId: String, replyTo: ActorRef[Reply]) extends ControlCommand with ReplyCommand
final case class Disconnect(clientId: String, replyTo: ActorRef[Reply]) extends ControlCommand with ReplyCommand
final case class QueryResource(clientId: String, replyTo: ActorRef[Reply]) extends ReplyCommand
final private[typed] case object SessionTimeout extends Command
final private case class ServiceKeyRegistered(registered: Receptionist.Registered) extends Command
sealed trait Reply
final case class Connected(status: Int, clientId: String) extends Reply
final case class Disconnected(status: Int, clientId: String) extends Reply
final case class ResourceQueried(status: Int, clientId: String, resources: Seq[String]) extends Reply
final case class ReplyError(status: Int) extends Reply
上面分别定义了actor可接收的请求消息:Command
和返回结果消息:Reply
。建议对于需要返回值的消息使用:replyTo
来命名收受返回值的actor字段,这里也可以不定义Reply
trait来做为统一的返回值类型,可以直接返回结果类型,如:ActorRef[String
。
这里将定义两个actor,一个做为父actor,一个做为子actor。父actor为:ComplexActor
,管理连接客户端和转发消息到子actor,每次有新的客户端连接上来时做以客户端clientId
做为名字创建一个子actor;子actor:ComplexClient
,保持客户端连接会话,处理消息……
ComplexActor
final class ComplexActor private(context: ActorContext[ComplexActor.Command]) {
import ComplexActor._
private var connects = Map.empty[String, ActorRef[Command]]
def init(): Behavior[Command] = Behaviors.receiveMessage {
case ServiceKeyRegistered(registered) if registered.isForKey(serviceKey) =>
context.log.info("Actor be registered, serviceKey: {}", serviceKey)
receive()
....
}
def receive(): Behavior[Command] =
Behaviors
.receiveMessage[Command] {
case cmd @ Connect(clientId, replyTo) =>
if (connects.contains(clientId)) {
replyTo ! Connected(IntStatus.CONFLICT, clientId)
} else {
val child = context.spawn(
Behaviors
.supervise(ComplexClient(clientId))
.onFailure(SupervisorStrategy.restart),
clientId)
context.watch(child)
connects = connects.updated(clientId, child)
child ! cmd
}
Behaviors.same
....
}
.receiveSignal {
case (_, Terminated(child)) =>
val clientId = child.path.name
connects -= clientId
context.unwatch(child)
Behaviors.same
}
}
ComplexActor
在收到Connect
消息后将首先判断请求客户端ID(clientId
)是否已经连接,若重复连接将直接返回409错误(Connected(IntStatus.CONFLICT, _)
)。若是一个新连接将调用context.spawn
函数在创建一个字actor:ComplexClient
。spawn
函数签名如下:
def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty): ActorRef[U]
behavior
是要创建的actor,name
为子actor的名字,需要保证在同一级内唯一(兄弟之间),props
可对actor作一些自定义,如:线程执行器(Dispatcher
)、邮箱等。
receiveSignal
用于接收系统控制信号消息,经典actor的preRestart
和postStop
回调函数(将分别做为PreRestart
和PostStop
信号),以及Terminated
消息都将做为信号发送到这里。
ComplexClient
final class ComplexClient private (
clientId: String,
context: ActorContext[ComplexActor.Command]) {
import ComplexActor._
def active(): Behavior[Command] = Behaviors.receiveMessagePartial {
....
case SessionTimeout =>
context.log.warn("Inactive timeout, stop!")
Behaviors.stopped
}
def init(): Behavior[Command] = Behaviors.receiveMessage {
case Connect(`clientId`, replyTo) =>
replyTo ! Connected(IntStatus.OK, clientId)
context.setReceiveTimeout(120.seconds, SessionTimeout)
active()
case other =>
context.log.warn("Receive invalid command: {}", other)
Behaviors.same
}
ComplexClient
定义了两个形为函数,init()
和active
。当客户端连接成功以后会返回active()
函数作为actor新的形为来接收之后的消息。这种返回一个新的Behavior
函数的形式替代了经典actor里的become
、unbecome
函数,它更直观,甚至还可以使用这种方式来实现**状态机**。
context.setReceiveTimeout(120.seconds, SessionTimeout)
用来设置两次消息接收之间的超时时间,这里设备为120秒。可以通过方式来实现服务端会话(session)超时判断,当session超时时返回Behaviors.stopped
消息来停止actor(自己)。这里需要注意的是context.stop
只能用来停止直接子actor,停止actor自身返回stopped
形为即可,这与经典actor有着明显的区别。
发现actor
Akka Typed取消了actorSelection
函数,不再允许通过actor path路径来查找ActorRef。取而代之的是使用Receptionist
机制来注册服务(actor实例)。也就是说,在Akka Typed中,actor默认情况下是不能查找的,只能通过引用(ActorRef[T]
)来使用,要么actor之间具有父子关系,要么通过消息传递ActorRef[T]
……
object ComplexActor {
val serviceKey = ServiceKey[Command]("complex")
def apply(): Behavior[Command] = Behaviors.setup { context =>
val registerAdapter = context.messageAdapter[Receptionist.Registered](value => ServiceKeyRegistered(value))
context.system.receptionist ! Receptionist.Register(serviceKey, context.self, registerAdapter)
new ComplexActor(context).init()
}
}
上面代码通过Receptionist.Register
将actor(context.self
引用)以serviceKey
注册到Actor系统的**receptionist**表,之后就可以通过serviceKey
来发现并获取此actor的引用。
val actorRef: ActorRef[ComplexActor.Command] = system.receptionist
.ask[Receptionist.Listing](Receptionist.Find(ComplexActor.serviceKey))
.map { listing =>
if (listing.isForKey(serviceKey))
listing.serviceInstances(serviceKey).head
else
throw new IllegalAccessException(s"Actor reference not found: $serviceKey")
}
消息适配器
有时候,需要将不匹配的消息发送给actor,比如:把receptionist服务注册结果 Receptionist.Registered
发送给一个actor,我们可以通过将消息包装到一个实现了Command
trait的case class来实现。如下面的代码示例:
val registerAdapter: ActorRef[Receptionist.Registered] =
context.messageAdapter[Receptionist.Registered](value => ServiceKeyRegistered(value))
在使用Receptionist.Register
时将registerAdapter
作为第3个参数传入,这样服务注册结果就将被包装成ServiceKeyRegistered
消息传给actor。
在actor内部处理异步任务
actor内部消息都是串行执行的,在actor内执行异步操作时需要小心。不能在Future
的回调函数里直接操作actor内部变量,因为它们很可能在两个不同的线程中。
可以通过context.pipeToSelf
将异步结果转换成一个消息传递给actor,这样异步结果将进入actor的邮箱列队,通过正确的消息处理机制来处理。
case QueryResource(_, replyTo) =>
context.pipeToSelf(findExternalResource())(value => InternalQueryResource(value, replyTo))
Behaviors.same
case InternalQueryResource(tryValue, replyTo) =>
replyTo ! tryValue
.map(ResourceQueried(IntStatus.OK, clientId, _))
.getOrElse(ResourceQueried(IntStatus.INTERNAL_ERROR, clientId, Nil))
Behaviors.same
在ActorSystem[_]外部创建actor
Akka Typed开始,ActorSystem[T]
也拥有一个泛型参数,在构造ActorSystem时需要传入一个默认Behavior[T]
,并将其作为经典actor下的user守卫(也就类似拥有akka://system-name/user
这个路径的actor),同时ActorSystem[T]
的actorOf
函数也被取消。Akka Typed推荐应用都从传给ActorSystem的默认Behavior[T]
开始构建actor树。但有时,也许通过ActorSystem[T]
的实例来创建actor是有意义的,可以通过将typed的ActorSystem[T]
转换成经典的untyped ActorSystem
来实现。代码如下:
implicit val timeout = Timeout(2.seconds)
implicit val system: ActorSystem[_] = _ // ....
val spawnActor: ActorRef[SpawnProtocol.Command] = system.toClassic
.actorOf(
PropsAdapter(Behaviors.supervise(SpawnProtocol())
.onFailure(SupervisorStrategy.resume)), "spawn")
.toTyped[SpawnProtocol.Command]
val helloScalaF: Future[ActorRef[HelloScala.Command]] =
spawnActor.ask[ActorRef[HelloScala.Command]](replyTo =>
SpawnProtocol.Spawn(HelloScala(), "sample", Props.empty, replyTo))
val helloScala: ActorRef[HelloScala.Command] = Await.result(helloScalaF, 2.seconds)
也可以将SpawnProtocol()
作为ActorSystem[_]
的初始Behavior[T]
来构造ActorSystem,这样就可以通过system.ask[ActorRef[T]](SpawnProtocol.Spawn(....))
来创建在user守卫下的actor了。
小结
本文通过两个例子展示了Akka Typed的特性,它与经典actor的区别还是挺大的。从untyped和typed,actor拥有了类型,这对于大规模actor系统开发可以在编译期发现很多重复,它将强制你在设计actor时首先考虑消息的定义。定义的消息即是actor之间的数据交互协议,消息定义的过程也是业务模式和模块划分的过程。
完整示例代码
HelloScala.scala
object HelloScala {
sealed trait Command
final case class Hello(message: String, replyTo: ActorRef[Reply]) extends Command
final case class Tell(message: String) extends Command
sealed trait Reply
final case class HelloReply(message: String) extends Reply
def apply(): Behavior[Command] = Behaviors.setup { context =>
Behaviors.receiveMessage {
case Hello(message, replyTo) =>
replyTo ! HelloReply(s"$message, scala!")
Behaviors.same
case Tell(message) =>
context.log.debug("收到消息:{}", message)
Behaviors.same
}
}
}
class HelloScalaSpec extends ScalaTestWithActorTestKit with WordSpecLike {
import HelloScala._
"HelloScala" should {
"tell" in {
val actorRef = spawn(HelloScala(), "tell")
actorRef ! Tell("Hello")
}
"replyTo" in {
val actorRef = spawn(HelloScala(), "replyTo")
val probe = createTestProbe[Reply]()
actorRef ! Hello("hello", probe.ref)
probe.expectMessageType[HelloReply] should be(HelloReply("hello, scala!"))
}
"ask" in {
import akka.actor.typed.scaladsl.AskPattern._
val actorRef = spawn(HelloScala(), "ask")
val reply =
actorRef
.ask[Reply](replyTo => Hello("Hello", replyTo))
.mapTo[HelloReply]
.futureValue
reply.message should be("Hello, scala!")
}
}
}
ComplexActor.scala
object ComplexActor {
sealed trait Command
trait ControlCommand extends Command { val clientId: String }
trait ReplyCommand extends Command { val replyTo: ActorRef[Reply] }
final case class Connect(clientId: String, replyTo: ActorRef[Reply])
extends ControlCommand
with ReplyCommand
final case class Disconnect(clientId: String, replyTo: ActorRef[Reply])
extends ControlCommand
with ReplyCommand
final case class AskMessage(
clientId: String,
message: String,
replyTo: ActorRef[Reply])
extends ReplyCommand
final case class ConnectCount(replyTo: ActorRef[Reply]) extends ReplyCommand
final case class QueryResource(clientId: String, replyTo: ActorRef[Reply])
extends ReplyCommand
final case class PublishEvent(clientId: String, event: String, payload: String)
extends Command
final private[typed] case object SessionTimeout extends Command
final private case class ServiceKeyRegistered(
registered: Receptionist.Registered)
extends Command
sealed trait Reply
final case class Connected(status: Int, clientId: String) extends Reply
final case class Disconnected(status: Int, clientId: String) extends Reply
final case class MessageAsked(status: Int, clientId: String, reply: String)
extends Reply
final case class ConnectCounted(count: Int, status: Int = IntStatus.OK)
extends Reply
final case class ResourceQueried(
status: Int,
clientId: String,
resources: Seq[String])
extends Reply
final case class ReplyError(status: Int) extends Reply
val serviceKey = ServiceKey[Command]("complex")
def apply(): Behavior[Command] = Behaviors.setup { context =>
val registerAdapter =
context.messageAdapter[Receptionist.Registered](value =>
ServiceKeyRegistered(value))
context.system.receptionist ! Receptionist
.Register(serviceKey, context.self, registerAdapter)
new ComplexActor(context).init()
}
}
final class ComplexActor(context: ActorContext[ComplexActor.Command]) {
import ComplexActor._
private var connects = Map.empty[String, ActorRef[Command]]
def init(): Receive[Command] =
Behaviors.receiveMessage[Command] {
case ServiceKeyRegistered(registered) if registered.isForKey(serviceKey) =>
context.log.info("Actor be registered, serviceKey: {}", serviceKey)
receive()
case cmd: ReplyCommand =>
cmd.replyTo ! ReplyError(IntStatus.SERVICE_UNAVAILABLE)
Behaviors.same
case other =>
context.log
.warn("Actor not registered, receive invalid message: {}", other)
Behaviors.same
}
def receive(): Behavior[Command] =
Behaviors
.receiveMessagePartial[Command] {
case cmd @ Connect(clientId, replyTo) =>
if (connects.contains(clientId)) {
replyTo ! Connected(IntStatus.CONFLICT, clientId)
} else {
val child = context.spawn(
Behaviors
.supervise(
ComplexClient(clientId, context.self.narrow[ControlCommand]))
.onFailure(SupervisorStrategy.restart),
clientId)
context.watch(child)
connects = connects.updated(clientId, child)
child ! cmd
}
Behaviors.same
case cmd @ Disconnect(clientId, replyTo) =>
if (connects.contains(clientId)) {
connects(clientId) ! cmd
} else {
replyTo ! Disconnected(IntStatus.NOT_FOUND, clientId)
}
Behaviors.same
case cmd: AskMessage =>
connects.get(cmd.clientId) match {
case Some(ref) => ref ! cmd
case None => cmd.replyTo ! ReplyError(IntStatus.NOT_FOUND)
}
Behaviors.same
case event: PublishEvent =>
connects.get(event.clientId).foreach(_ ! event)
Behaviors.same
case ConnectCount(replyTo) =>
replyTo ! ConnectCounted(connects.size)
Behaviors.same
case cmd: QueryResource =>
connects.get(cmd.clientId) match {
case Some(ref) => ref ! cmd
case None => cmd.replyTo ! ReplyError(IntStatus.NOT_FOUND)
}
Behaviors.same
}
.receiveSignal {
case (_, Terminated(child)) =>
val clientId = child.path.name
connects -= clientId
context.unwatch(child)
Behaviors.same
}
}
object ComplexClient {
import ComplexActor._
final private[typed] case class InternalQueryResource(
resources: Try[Seq[String]],
replyTo: ActorRef[Reply])
extends Command
def apply(
clientId: String,
parent: ActorRef[ControlCommand]): Behavior[Command] =
Behaviors.setup { context =>
Behaviors.withTimers(timers =>
new ComplexClient(clientId, parent, timers, context).init())
}
}
final class ComplexClient private (
clientId: String,
parent: ActorRef[ComplexActor.ControlCommand],
timers: TimerScheduler[ComplexActor.Command],
context: ActorContext[ComplexActor.Command]) {
import ComplexActor._
import ComplexClient._
import context.executionContext
def active(): Behavior[Command] = Behaviors.receiveMessagePartial {
case AskMessage(_, message, reply) =>
reply ! MessageAsked(IntStatus.OK, clientId, message.reverse)
Behaviors.same
case PublishEvent(_, event, payload) =>
context.log.debug("Receive event: {}, payload: {}", event, payload)
Behaviors.same
case QueryResource(_, replyTo) =>
context.pipeToSelf(findExternalResource())(value =>
InternalQueryResource(value, replyTo))
Behaviors.same
case InternalQueryResource(tryValue, replyTo) =>
replyTo ! tryValue
.map(ResourceQueried(IntStatus.OK, clientId, _))
.getOrElse(ResourceQueried(IntStatus.INTERNAL_ERROR, clientId, Nil))
Behaviors.same
case Disconnect(_, replyTo) =>
replyTo ! Disconnected(IntStatus.OK, clientId)
Behaviors.stopped
case SessionTimeout =>
context.log.warn("Inactive timeout, stop!")
Behaviors.stopped
}
def init(): Behavior[Command] = Behaviors.receiveMessage {
case Connect(`clientId`, replyTo) =>
replyTo ! Connected(IntStatus.OK, clientId)
context.setReceiveTimeout(120.seconds, SessionTimeout)
active()
case other =>
context.log.warn("Receive invalid command: {}", other)
Behaviors.same
}
private def findExternalResource(): Future[Seq[String]] = Future {
TimeUnit.MILLISECONDS.sleep(10)
Range(0, 10).map(_.toString).toVector
}
}
class ComplexActorSpec
extends WordSpec
with Matchers
with ScalaFutures
with OptionValues
with BeforeAndAfterAll
with StrictLogging {
implicit private val system = ActorSystem(SpawnProtocol(), "complex-manager")
implicit private val timeout = Timeout(2.seconds)
implicit override def patienceConfig: PatienceConfig =
PatienceConfig(Span(10, Seconds), Span(50, Millis))
"ComplexActor" should {
var complexActor: ActorRef[ComplexActor.Command] = null
"Create actor from outside of ActorSystem[_]" in {
complexActor = system
.ask[ActorRef[ComplexActor.Command]](replTo =>
SpawnProtocol.Spawn(ComplexActor(), "complex", Props.empty, replTo))
.futureValue
complexActor.path.name shouldBe "complex"
}
"Discover actors using ServiceKey[T]" in {
val maybeDeepActor =
AkkaUtils
.findActorByServiceKey(ComplexActor.serviceKey, 500.millis)
.futureValue
val ref = maybeDeepActor.value
ref shouldBe complexActor
}
val client1 = "client1"
"Connect" in {
val connected =
complexActor
.ask[ComplexActor.Reply](ComplexActor.Connect(client1, _))
.mapTo[ComplexActor.Connected]
.futureValue
connected.status should be(IntStatus.OK)
connected.clientId should be(client1)
val connectCounted =
complexActor
.ask[ComplexActor.Reply](ComplexActor.ConnectCount)
.mapTo[ComplexActor.ConnectCounted]
.futureValue
connectCounted.count should be > 0
}
"AskMessage" in {
val messageAsked = complexActor
.ask[ComplexActor.Reply](ComplexActor.AskMessage(client1, "hello", _))
.mapTo[ComplexActor.MessageAsked]
.futureValue
messageAsked should be(
ComplexActor.MessageAsked(IntStatus.OK, client1, "olleh"))
}
"QueryResource" in {
val queried = complexActor
.ask[ComplexActor.Reply](ComplexActor.QueryResource(client1, _))
.mapTo[ComplexActor.ResourceQueried]
.futureValue
queried.status should be(IntStatus.OK)
queried.resources should contain("3")
}
"Disconnect" in {
val disconnected = complexActor
.ask[ComplexActor.Reply](replyTo =>
ComplexActor.Disconnect(client1, replyTo))
.mapTo[ComplexActor.Disconnected]
.futureValue
disconnected.status should be(IntStatus.OK)
disconnected.clientId should be(client1)
val connectCounted =
complexActor
.ask[ComplexActor.Reply](ComplexActor.ConnectCount)
.mapTo[ComplexActor.ConnectCounted]
.futureValue
connectCounted.count should be(0)
}
"AskMessage return 404" in {
val messageAsked = complexActor
.ask[ComplexActor.Reply](ComplexActor.AskMessage(client1, "hello", _))
.mapTo[ComplexActor.ReplyError]
.futureValue
messageAsked.status should be(IntStatus.NOT_FOUND)
}
}
override protected def afterAll(): Unit = {
system.terminate()
Await.result(system.whenTerminated, 10.seconds)
}
}