Akka Typed Actor

Hello Scala!

Akka Typed Actor从2.4开始直到2.5可以商用,进而Akka 2.6已经把Akka Typed Actor做为推荐的Actor使用模式。Typed Actor与原先的Untyped Actor最大的区别Actor有类型了,其签名也改成了akka.actor.typed.ActorRef[T]。通过一个简单的示例来看看在Akka Typed环境下怎样使用Actor。

sealed trait Command
final case class Hello(message: String, replyTo: ActorRef[Reply]) extends Command
final case class Tell(message: String) extends Command

sealed trait Reply
final case class HelloReply(message: String) extends Reply

def apply(): Behavior[Command] = Behaviors.setup { context =>
  Behaviors.receiveMessage {
    case Hello(message, replyTo) =>
      replyTo ! HelloReply(s"$message, scala!")
      Behaviors.same
    case Tell(message) =>
      context.log.debug("收到消息:{}", message)
      Behaviors.same
  }
}

Akka Typed不再需要通过类的形式来实现Actor接口定义,而是函数的形式来定义actor。可以看到,定义的actor类型为Behavior[T](**形为**),通过Behaviors.receiveMessage[T](T => Behavior[T]): Receive[T]函数来处理接收到的消息,而Receive继承了Behavior trait。通过函数签名可以看到,每次接收到消息并对其处理完成后,都必需要返回一个新的形为。

apply(): Behavior[Command]函数签名里的范性参数类型Command限制了这个actor将只接收CommandCommand子类型的消息,编译器将在编译期对传给actor的消息做类型检查,相对于从前的untyped actor可以向actor传入任何类型的消息,这可以限制的减少程序中的bug。特别是在程序规模很大,当你定义了成百上千个消息时。

也因为有类型的actor,在Akka Typed中没有了隐式发送的sender: ActorRef,必需在发送的消息里面包含回复字段,就如Hello消息定义里的replyTo: ActorRef[Reply]字段一样。actor在处理完Hello消息后可以通过它向发送者回复处理结果。

"tell" in {
  val actorRef = spawn(HelloScala(), "tell")
  actorRef ! Tell("Hello")
}

"replyTo" in {
  val actorRef = spawn(HelloScala(), "replyTo")
  val probe = createTestProbe[Reply]()
  actorRef ! Hello("hello", probe.ref)
  probe.expectMessageType[HelloReply] should be(HelloReply("hello, scala!"))
}

"ask" in {
  import akka.actor.typed.scaladsl.AskPattern._
  val actorRef = spawn(HelloScala(), "ask")
  val reply =
    actorRef
      .ask[Reply](replyTo => Hello("Hello", replyTo))
      .mapTo[HelloReply]
      .futureValue
  reply.message should be("Hello, scala!")
}

更复杂的一个示例

上一个示例简单的演示了Akka Typed Actor的功能和基本使用方式,接下来看一个更复杂的示例,将展示Akka Typed更多的特性及功能。

首先是消息定义:

  sealed trait Command
  trait ControlCommand extends Command { val clientId: String }
  trait ReplyCommand extends Command { val replyTo: ActorRef[Reply] }

  final case class Connect(clientId: String, replyTo: ActorRef[Reply]) extends ControlCommand with ReplyCommand
  final case class Disconnect(clientId: String, replyTo: ActorRef[Reply]) extends ControlCommand with ReplyCommand
  final case class QueryResource(clientId: String, replyTo: ActorRef[Reply]) extends ReplyCommand
  final private[typed] case object SessionTimeout extends Command
  final private case class ServiceKeyRegistered(registered: Receptionist.Registered) extends Command

  sealed trait Reply
  final case class Connected(status: Int, clientId: String) extends Reply
  final case class Disconnected(status: Int, clientId: String) extends Reply
  final case class ResourceQueried(status: Int, clientId: String, resources: Seq[String]) extends Reply
  final case class ReplyError(status: Int) extends Reply

上面分别定义了actor可接收的请求消息:Command和返回结果消息:Reply。建议对于需要返回值的消息使用:replyTo来命名收受返回值的actor字段,这里也可以不定义Reply trait来做为统一的返回值类型,可以直接返回结果类型,如:ActorRef[String

这里将定义两个actor,一个做为父actor,一个做为子actor。父actor为:ComplexActor,管理连接客户端和转发消息到子actor,每次有新的客户端连接上来时做以客户端clientId做为名字创建一个子actor;子actor:ComplexClient,保持客户端连接会话,处理消息……

ComplexActor

final class ComplexActor private(context: ActorContext[ComplexActor.Command]) {
  import ComplexActor._
  private var connects = Map.empty[String, ActorRef[Command]]

  def init(): Behavior[Command] = Behaviors.receiveMessage {
    case ServiceKeyRegistered(registered) if registered.isForKey(serviceKey) =>
      context.log.info("Actor be registered, serviceKey: {}", serviceKey)
      receive()
    ....
  }

  def receive(): Behavior[Command] =
    Behaviors
      .receiveMessage[Command] {
        case cmd @ Connect(clientId, replyTo) =>
          if (connects.contains(clientId)) {
            replyTo ! Connected(IntStatus.CONFLICT, clientId)
          } else {
            val child = context.spawn(
              Behaviors
                .supervise(ComplexClient(clientId))
                .onFailure(SupervisorStrategy.restart),
              clientId)
            context.watch(child)
            connects = connects.updated(clientId, child)
            child ! cmd
          }
          Behaviors.same
        ....
      }
      .receiveSignal {
        case (_, Terminated(child)) =>
          val clientId = child.path.name
          connects -= clientId
          context.unwatch(child)
          Behaviors.same
      }
}

ComplexActor在收到Connect消息后将首先判断请求客户端ID(clientId)是否已经连接,若重复连接将直接返回409错误(Connected(IntStatus.CONFLICT, _))。若是一个新连接将调用context.spawn函数在创建一个字actor:ComplexClientspawn函数签名如下:

def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty): ActorRef[U]

behavior是要创建的actor,name为子actor的名字,需要保证在同一级内唯一(兄弟之间),props可对actor作一些自定义,如:线程执行器(Dispatcher)、邮箱等。

receiveSignal用于接收系统控制信号消息,经典actor的preRestartpostStop回调函数(将分别做为PreRestartPostStop信号),以及Terminated消息都将做为信号发送到这里。

ComplexClient

final class ComplexClient private (
    clientId: String,
    context: ActorContext[ComplexActor.Command]) {
  import ComplexActor._

  def active(): Behavior[Command] = Behaviors.receiveMessagePartial {
    ....
    case SessionTimeout =>
      context.log.warn("Inactive timeout, stop!")
      Behaviors.stopped
  }

  def init(): Behavior[Command] = Behaviors.receiveMessage {
    case Connect(`clientId`, replyTo) =>
      replyTo ! Connected(IntStatus.OK, clientId)
      context.setReceiveTimeout(120.seconds, SessionTimeout)
      active()
    case other =>
      context.log.warn("Receive invalid command: {}", other)
      Behaviors.same
  }

ComplexClient定义了两个形为函数,init()active。当客户端连接成功以后会返回active()函数作为actor新的形为来接收之后的消息。这种返回一个新的Behavior函数的形式替代了经典actor里的becomeunbecome函数,它更直观,甚至还可以使用这种方式来实现**状态机**。

context.setReceiveTimeout(120.seconds, SessionTimeout)用来设置两次消息接收之间的超时时间,这里设备为120秒。可以通过方式来实现服务端会话(session)超时判断,当session超时时返回Behaviors.stopped消息来停止actor(自己)。这里需要注意的是context.stop只能用来停止直接子actor,停止actor自身返回stopped形为即可,这与经典actor有着明显的区别。

发现actor

Akka Typed取消了actorSelection函数,不再允许通过actor path路径来查找ActorRef。取而代之的是使用Receptionist机制来注册服务(actor实例)。也就是说,在Akka Typed中,actor默认情况下是不能查找的,只能通过引用(ActorRef[T])来使用,要么actor之间具有父子关系,要么通过消息传递ActorRef[T]……

object ComplexActor {
  val serviceKey = ServiceKey[Command]("complex")

  def apply(): Behavior[Command] = Behaviors.setup { context =>
    val registerAdapter = context.messageAdapter[Receptionist.Registered](value => ServiceKeyRegistered(value))
    context.system.receptionist ! Receptionist.Register(serviceKey, context.self, registerAdapter)
    new ComplexActor(context).init()
  }
}

上面代码通过Receptionist.Register将actor(context.self引用)以serviceKey注册到Actor系统的**receptionist**表,之后就可以通过serviceKey来发现并获取此actor的引用。

  val actorRef: ActorRef[ComplexActor.Command] = system.receptionist
    .ask[Receptionist.Listing](Receptionist.Find(ComplexActor.serviceKey))
    .map { listing =>
      if (listing.isForKey(serviceKey))
        listing.serviceInstances(serviceKey).head
      else 
        throw new IllegalAccessException(s"Actor reference not found: $serviceKey")
    }

消息适配器

有时候,需要将不匹配的消息发送给actor,比如:把receptionist服务注册结果 Receptionist.Registered发送给一个actor,我们可以通过将消息包装到一个实现了Command trait的case class来实现。如下面的代码示例:

val registerAdapter: ActorRef[Receptionist.Registered] =
  context.messageAdapter[Receptionist.Registered](value => ServiceKeyRegistered(value))

在使用Receptionist.Register时将registerAdapter作为第3个参数传入,这样服务注册结果就将被包装成ServiceKeyRegistered消息传给actor。

在actor内部处理异步任务

actor内部消息都是串行执行的,在actor内执行异步操作时需要小心。不能在Future的回调函数里直接操作actor内部变量,因为它们很可能在两个不同的线程中。

可以通过context.pipeToSelf将异步结果转换成一个消息传递给actor,这样异步结果将进入actor的邮箱列队,通过正确的消息处理机制来处理。

  case QueryResource(_, replyTo) =>
    context.pipeToSelf(findExternalResource())(value => InternalQueryResource(value, replyTo))
    Behaviors.same

  case InternalQueryResource(tryValue, replyTo) =>
    replyTo ! tryValue
      .map(ResourceQueried(IntStatus.OK, clientId, _))
      .getOrElse(ResourceQueried(IntStatus.INTERNAL_ERROR, clientId, Nil))
    Behaviors.same

在ActorSystem[_]外部创建actor

Akka Typed开始,ActorSystem[T]也拥有一个泛型参数,在构造ActorSystem时需要传入一个默认Behavior[T],并将其作为经典actor下的user守卫(也就类似拥有akka://system-name/user这个路径的actor),同时ActorSystem[T]actorOf函数也被取消。Akka Typed推荐应用都从传给ActorSystem的默认Behavior[T]开始构建actor树。但有时,也许通过ActorSystem[T]的实例来创建actor是有意义的,可以通过将typed的ActorSystem[T]转换成经典的untyped ActorSystem来实现。代码如下:

implicit val timeout = Timeout(2.seconds)
implicit val system: ActorSystem[_] = _ // ....

val spawnActor: ActorRef[SpawnProtocol.Command] = system.toClassic
  .actorOf(
    PropsAdapter(Behaviors.supervise(SpawnProtocol())
      .onFailure(SupervisorStrategy.resume)), "spawn")
  .toTyped[SpawnProtocol.Command]

val helloScalaF: Future[ActorRef[HelloScala.Command]] = 
  spawnActor.ask[ActorRef[HelloScala.Command]](replyTo =>
    SpawnProtocol.Spawn(HelloScala(), "sample", Props.empty, replyTo))

val helloScala: ActorRef[HelloScala.Command] = Await.result(helloScalaF, 2.seconds)

也可以将SpawnProtocol()作为ActorSystem[_]的初始Behavior[T]来构造ActorSystem,这样就可以通过system.ask[ActorRef[T]](SpawnProtocol.Spawn(....))来创建在user守卫下的actor了。

小结

本文通过两个例子展示了Akka Typed的特性,它与经典actor的区别还是挺大的。从untyped和typed,actor拥有了类型,这对于大规模actor系统开发可以在编译期发现很多重复,它将强制你在设计actor时首先考虑消息的定义。定义的消息即是actor之间的数据交互协议,消息定义的过程也是业务模式和模块划分的过程。

完整示例代码

HelloScala.scala

object HelloScala {
  sealed trait Command
  final case class Hello(message: String, replyTo: ActorRef[Reply]) extends Command
  final case class Tell(message: String) extends Command

  sealed trait Reply
  final case class HelloReply(message: String) extends Reply

  def apply(): Behavior[Command] = Behaviors.setup { context =>
    Behaviors.receiveMessage {
      case Hello(message, replyTo) =>
        replyTo ! HelloReply(s"$message, scala!")
        Behaviors.same
      case Tell(message) =>
        context.log.debug("收到消息:{}", message)
        Behaviors.same
    }
  }
}

class HelloScalaSpec extends ScalaTestWithActorTestKit with WordSpecLike {
  import HelloScala._

  "HelloScala" should {
    "tell" in {
      val actorRef = spawn(HelloScala(), "tell")
      actorRef ! Tell("Hello")
    }

    "replyTo" in {
      val actorRef = spawn(HelloScala(), "replyTo")
      val probe = createTestProbe[Reply]()
      actorRef ! Hello("hello", probe.ref)
      probe.expectMessageType[HelloReply] should be(HelloReply("hello, scala!"))
    }

    "ask" in {
      import akka.actor.typed.scaladsl.AskPattern._
      val actorRef = spawn(HelloScala(), "ask")
      val reply =
        actorRef
          .ask[Reply](replyTo => Hello("Hello", replyTo))
          .mapTo[HelloReply]
          .futureValue
      reply.message should be("Hello, scala!")
    }
  }
}

ComplexActor.scala

object ComplexActor {
  sealed trait Command
  trait ControlCommand extends Command { val clientId: String }
  trait ReplyCommand extends Command { val replyTo: ActorRef[Reply] }

  final case class Connect(clientId: String, replyTo: ActorRef[Reply])
      extends ControlCommand
      with ReplyCommand
  final case class Disconnect(clientId: String, replyTo: ActorRef[Reply])
      extends ControlCommand
      with ReplyCommand
  final case class AskMessage(
      clientId: String,
      message: String,
      replyTo: ActorRef[Reply])
      extends ReplyCommand
  final case class ConnectCount(replyTo: ActorRef[Reply]) extends ReplyCommand
  final case class QueryResource(clientId: String, replyTo: ActorRef[Reply])
      extends ReplyCommand
  final case class PublishEvent(clientId: String, event: String, payload: String)
      extends Command
  final private[typed] case object SessionTimeout extends Command
  final private case class ServiceKeyRegistered(
      registered: Receptionist.Registered)
      extends Command

  sealed trait Reply
  final case class Connected(status: Int, clientId: String) extends Reply
  final case class Disconnected(status: Int, clientId: String) extends Reply
  final case class MessageAsked(status: Int, clientId: String, reply: String)
      extends Reply
  final case class ConnectCounted(count: Int, status: Int = IntStatus.OK)
      extends Reply
  final case class ResourceQueried(
      status: Int,
      clientId: String,
      resources: Seq[String])
      extends Reply
  final case class ReplyError(status: Int) extends Reply

  val serviceKey = ServiceKey[Command]("complex")

  def apply(): Behavior[Command] = Behaviors.setup { context =>
    val registerAdapter =
      context.messageAdapter[Receptionist.Registered](value =>
        ServiceKeyRegistered(value))
    context.system.receptionist ! Receptionist
      .Register(serviceKey, context.self, registerAdapter)
    new ComplexActor(context).init()
  }
}

final class ComplexActor(context: ActorContext[ComplexActor.Command]) {
  import ComplexActor._
  private var connects = Map.empty[String, ActorRef[Command]]

  def init(): Receive[Command] =
    Behaviors.receiveMessage[Command] {
      case ServiceKeyRegistered(registered) if registered.isForKey(serviceKey) =>
        context.log.info("Actor be registered, serviceKey: {}", serviceKey)
        receive()
      case cmd: ReplyCommand =>
        cmd.replyTo ! ReplyError(IntStatus.SERVICE_UNAVAILABLE)
        Behaviors.same
      case other =>
        context.log
          .warn("Actor not registered, receive invalid message: {}", other)
        Behaviors.same
    }

  def receive(): Behavior[Command] =
    Behaviors
      .receiveMessagePartial[Command] {
        case cmd @ Connect(clientId, replyTo) =>
          if (connects.contains(clientId)) {
            replyTo ! Connected(IntStatus.CONFLICT, clientId)
          } else {
            val child = context.spawn(
              Behaviors
                .supervise(
                  ComplexClient(clientId, context.self.narrow[ControlCommand]))
                .onFailure(SupervisorStrategy.restart),
              clientId)
            context.watch(child)
            connects = connects.updated(clientId, child)
            child ! cmd
          }
          Behaviors.same

        case cmd @ Disconnect(clientId, replyTo) =>
          if (connects.contains(clientId)) {
            connects(clientId) ! cmd
          } else {
            replyTo ! Disconnected(IntStatus.NOT_FOUND, clientId)
          }
          Behaviors.same

        case cmd: AskMessage =>
          connects.get(cmd.clientId) match {
            case Some(ref) => ref ! cmd
            case None      => cmd.replyTo ! ReplyError(IntStatus.NOT_FOUND)
          }
          Behaviors.same

        case event: PublishEvent =>
          connects.get(event.clientId).foreach(_ ! event)
          Behaviors.same

        case ConnectCount(replyTo) =>
          replyTo ! ConnectCounted(connects.size)
          Behaviors.same

        case cmd: QueryResource =>
          connects.get(cmd.clientId) match {
            case Some(ref) => ref ! cmd
            case None      => cmd.replyTo ! ReplyError(IntStatus.NOT_FOUND)
          }
          Behaviors.same
      }
      .receiveSignal {
        case (_, Terminated(child)) =>
          val clientId = child.path.name
          connects -= clientId
          context.unwatch(child)
          Behaviors.same
      }
}

object ComplexClient {
  import ComplexActor._

  final private[typed] case class InternalQueryResource(
      resources: Try[Seq[String]],
      replyTo: ActorRef[Reply])
      extends Command

  def apply(
      clientId: String,
      parent: ActorRef[ControlCommand]): Behavior[Command] =
    Behaviors.setup { context =>
      Behaviors.withTimers(timers =>
        new ComplexClient(clientId, parent, timers, context).init())
    }
}

final class ComplexClient private (
    clientId: String,
    parent: ActorRef[ComplexActor.ControlCommand],
    timers: TimerScheduler[ComplexActor.Command],
    context: ActorContext[ComplexActor.Command]) {
  import ComplexActor._
  import ComplexClient._
  import context.executionContext

  def active(): Behavior[Command] = Behaviors.receiveMessagePartial {
    case AskMessage(_, message, reply) =>
      reply ! MessageAsked(IntStatus.OK, clientId, message.reverse)
      Behaviors.same

    case PublishEvent(_, event, payload) =>
      context.log.debug("Receive event: {}, payload: {}", event, payload)
      Behaviors.same

    case QueryResource(_, replyTo) =>
      context.pipeToSelf(findExternalResource())(value =>
        InternalQueryResource(value, replyTo))
      Behaviors.same

    case InternalQueryResource(tryValue, replyTo) =>
      replyTo ! tryValue
        .map(ResourceQueried(IntStatus.OK, clientId, _))
        .getOrElse(ResourceQueried(IntStatus.INTERNAL_ERROR, clientId, Nil))
      Behaviors.same

    case Disconnect(_, replyTo) =>
      replyTo ! Disconnected(IntStatus.OK, clientId)
      Behaviors.stopped

    case SessionTimeout =>
      context.log.warn("Inactive timeout, stop!")
      Behaviors.stopped
  }

  def init(): Behavior[Command] = Behaviors.receiveMessage {
    case Connect(`clientId`, replyTo) =>
      replyTo ! Connected(IntStatus.OK, clientId)
      context.setReceiveTimeout(120.seconds, SessionTimeout)
      active()
    case other =>
      context.log.warn("Receive invalid command: {}", other)
      Behaviors.same
  }

  private def findExternalResource(): Future[Seq[String]] = Future {
    TimeUnit.MILLISECONDS.sleep(10)
    Range(0, 10).map(_.toString).toVector
  }
}

class ComplexActorSpec
    extends WordSpec
    with Matchers
    with ScalaFutures
    with OptionValues
    with BeforeAndAfterAll
    with StrictLogging {
  implicit private val system = ActorSystem(SpawnProtocol(), "complex-manager")
  implicit private val timeout = Timeout(2.seconds)

  implicit override def patienceConfig: PatienceConfig =
    PatienceConfig(Span(10, Seconds), Span(50, Millis))

  "ComplexActor" should {
    var complexActor: ActorRef[ComplexActor.Command] = null

    "Create actor from outside of ActorSystem[_]" in {
      complexActor = system
        .ask[ActorRef[ComplexActor.Command]](replTo =>
          SpawnProtocol.Spawn(ComplexActor(), "complex", Props.empty, replTo))
        .futureValue
      complexActor.path.name shouldBe "complex"
    }

    "Discover actors using ServiceKey[T]" in {
      val maybeDeepActor =
        AkkaUtils
          .findActorByServiceKey(ComplexActor.serviceKey, 500.millis)
          .futureValue
      val ref = maybeDeepActor.value
      ref shouldBe complexActor
    }

    val client1 = "client1"

    "Connect" in {
      val connected =
        complexActor
          .ask[ComplexActor.Reply](ComplexActor.Connect(client1, _))
          .mapTo[ComplexActor.Connected]
          .futureValue
      connected.status should be(IntStatus.OK)
      connected.clientId should be(client1)

      val connectCounted =
        complexActor
          .ask[ComplexActor.Reply](ComplexActor.ConnectCount)
          .mapTo[ComplexActor.ConnectCounted]
          .futureValue
      connectCounted.count should be > 0
    }

    "AskMessage" in {
      val messageAsked = complexActor
        .ask[ComplexActor.Reply](ComplexActor.AskMessage(client1, "hello", _))
        .mapTo[ComplexActor.MessageAsked]
        .futureValue
      messageAsked should be(
        ComplexActor.MessageAsked(IntStatus.OK, client1, "olleh"))
    }

    "QueryResource" in {
      val queried = complexActor
        .ask[ComplexActor.Reply](ComplexActor.QueryResource(client1, _))
        .mapTo[ComplexActor.ResourceQueried]
        .futureValue
      queried.status should be(IntStatus.OK)
      queried.resources should contain("3")
    }

    "Disconnect" in {
      val disconnected = complexActor
        .ask[ComplexActor.Reply](replyTo =>
          ComplexActor.Disconnect(client1, replyTo))
        .mapTo[ComplexActor.Disconnected]
        .futureValue
      disconnected.status should be(IntStatus.OK)
      disconnected.clientId should be(client1)

      val connectCounted =
        complexActor
          .ask[ComplexActor.Reply](ComplexActor.ConnectCount)
          .mapTo[ComplexActor.ConnectCounted]
          .futureValue
      connectCounted.count should be(0)
    }

    "AskMessage return 404" in {
      val messageAsked = complexActor
        .ask[ComplexActor.Reply](ComplexActor.AskMessage(client1, "hello", _))
        .mapTo[ComplexActor.ReplyError]
        .futureValue
      messageAsked.status should be(IntStatus.NOT_FOUND)
    }
  }

  override protected def afterAll(): Unit = {
    system.terminate()
    Await.result(system.whenTerminated, 10.seconds)
  }
}