gRPC服务

定义消息和服务

syntax = "proto3";

option java_multiple_files = true;

package greeter;

message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
}

service GreeterService {
    // req-resp
    rpc SayHello (HelloRequest) returns (HelloReply) {}

    // keep requests
    rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}

    // keep responses
    rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}

    // keep requests & responses
    rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}

这里定义了两个消息:HelloRequestHelloReplyGreeterService服务,GreeterService定义了4个服务方法,分别是:

  • SayHello:经典的请求-响应服务,发送一个请求获得一个响应;
  • ItKeepsTalking:持续不断的发送多个请求,在请求停止后获得一个响应;
  • ItKeepsReplying:发送一个请求,获得持续不断的多个响应;
  • StreamHelloes:持续不断的发送响应的同时也可获得持续不断的响应,可以通过Source.queue来获得可发送数据的Queue和获得响应数据的Source

实现 gRPC 服务

class GreeterServiceImpl()(implicit system: ActorSystem[_]) extends GreeterService {
  import system.executionContext

  override def sayHello(in: HelloRequest): Future[HelloReply] = {
    Future.successful(HelloReply(s"Hello, ${in.name}."))
  }

  override def itKeepsTalking(
      in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
    in.runWith(Sink.seq)
      .map(ins => HelloReply("Hello, " + ins.map(_.name).mkString("", ", ", ".")))
  }

  override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
    Source
      .fromIterator(() => Iterator.from(1))
      .map(n => HelloReply(s"Hello, ${in.name}; this is $n times."))
  }

  override def streamHellos(
      ins: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
    ins.map(in => HelloReply(s"Hello, ${in.name}."))
  }
}
Note

有关Akka Streams的更多内容可阅读作者写的另一本开源电子书: Akka Cookbook

测试 gRPC 服务

通过 Scalatest 对实现的4个gRPC服务进行测试,下面是单元测试代码:

"sayHello" in {
  greeterClient.sayHello(HelloRequest("Scala")).futureValue should ===(
    HelloReply("Hello, Scala."))
}

"itKeepsReplying" in {
  greeterClient
    .itKeepsReplying(HelloRequest("Scala"))
    .take(5)
    .runWith(Sink.seq)
    .futureValue should ===(
    Seq(
      HelloReply("Hello, Scala; this is 1 times."),
      HelloReply("Hello, Scala; this is 2 times."),
      HelloReply("Hello, Scala; this is 3 times."),
      HelloReply("Hello, Scala; this is 4 times."),
      HelloReply("Hello, Scala; this is 5 times.")))
}

"itKeepsTalking" in {
  val (queue, in) =
    Source
      .queue[HelloRequest](16, OverflowStrategy.backpressure)
      .preMaterialize()
  val f = greeterClient.itKeepsTalking(in)
  Seq("Scala", "Java", "Groovy", "Kotlin").foreach(program =>
    queue.offer(HelloRequest(program)))
  TimeUnit.SECONDS.sleep(1)
  queue.complete()
  f.futureValue should ===(HelloReply("Hello, Scala, Java, Groovy, Kotlin."))
}

"streamHellos" in {
  val (queue, in) =
    Source
      .queue[HelloRequest](16, OverflowStrategy.backpressure)
      .preMaterialize()
  val f = greeterClient.streamHellos(in).runWith(Sink.seq)
  Seq("Scala", "Java", "Groovy", "Kotlin").foreach(item =>
    queue.offer(HelloRequest(item)))
  TimeUnit.SECONDS.sleep(1)
  queue.complete()
  f.futureValue should ===(
    Seq(
      HelloReply("Hello, Scala."),
      HelloReply("Hello, Java."),
      HelloReply("Hello, Groovy."),
      HelloReply("Hello, Kotlin.")))
}

在运行测试前需要先启动gRPC服务,在 Scalatest 的beforeAll函数内启动gRPC HTTP 2服务:

override protected def beforeAll(): Unit = {
  super.beforeAll()
  val handler = GreeterServiceHandler(new GreeterServiceImpl())
  Http().bindAndHandleAsync(handler, "localhost", 8000)
  greeterClient = GreeterServiceClient(
    GrpcClientSettings.fromConfig(GreeterService.name))
}

在构造 GreeterServiceClient gRCP客户端时需要提供GrpcClientSettings设置选项,这里通过调用fromConfig函数来从 HOCON 配置文件里读取gRPC服务选项,相应的application-test.conf配置文件内容如下:

akka.http.server.preview.enable-http2 = on
akka.grpc.client {
  "greeter.GreeterService" {
    host = "localhost"
    port = 8000
    use-tls = false
  }
}

其中use-tls设置gRPC客户端不使用HTTPs建立连接,因为我们这个单元测试启动的gRPC HTTP服务不未启动SSL/TLS。