路由 DSL

Routing DSL

In addition to the Core Server API Akka HTTP provides a very flexible “Routing DSL” for elegantly defining RESTful web services. It picks up where the low-level API leaves off and offers much of the higher-level functionality of typical web servers or frameworks, like deconstruction of URIs, content negotiation or static content serving.

核心服务器 API 之外,Akka HTTP 提供了一个很灵活的 “路由 DSL” 来优雅的定义 RESTful Web 服务。 它改善低级 API 的不足,同时提供典型 Web 服务器或框架的很多高级功能,比如:解构 URI、内容协商或者静态内容服务。

Note

It is recommended to read the Implications of the streaming nature of Request/Response Entities section, as it explains the underlying full-stack streaming concepts, which may be unexpected when coming from a background with non-“streaming first” HTTP Servers.

推荐阅读 请求/响应实体流的实质含义 部分,它阐明了(Akka HTTP)底层的全栈流的概念。 因为对于没有“流式优先” HTTP 服务器概念背景的人来说,也许会感到难以理解。

Minimal Example

小型示例

This is a complete, very basic Akka HTTP application relying on the Routing DSL:

这是一个完整的、最基本的依靠路由 DSL 的 Akka HTTP 应用程序:

Scala
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.io.StdIn

object WebServer {
  def main(args: Array[String]) {

    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val route =
      path("hello") {
        get {
          complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}
Java
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Route;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Flow;

import java.util.concurrent.CompletionStage;

public class HttpServerMinimalExampleTest extends AllDirectives {

  public static void main(String[] args) throws Exception {
    // boot up server using the route as defined below
    ActorSystem system = ActorSystem.create("routes");

    final Http http = Http.get(system);
    final ActorMaterializer materializer = ActorMaterializer.create(system);

    //In order to access all directives we need an instance where the routes are define.
    HttpServerMinimalExampleTest app = new HttpServerMinimalExampleTest();

    final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.createRoute().flow(system, materializer);
    final CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow,
        ConnectHttp.toHost("localhost", 8080), materializer);

    System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
    System.in.read(); // let it run until user presses return

    binding
        .thenCompose(ServerBinding::unbind) // trigger unbinding from the port
        .thenAccept(unbound -> system.terminate()); // and shutdown when done
  }

  private Route createRoute() {
    return concat(
        path("hello", () ->
            get(() ->
                complete("<h1>Say hello to akka-http</h1>"))));
  }
}

It starts an HTTP Server on localhost and replies to GET requests to /hello with a simple response.

它在本地启动一个 HTTP 服务器,并使用简单响应回复到 /hello 的 GET 请求。

API may change

The following example uses an experimental feature and its API is subjected to change in future releases of Akka HTTP. For further information about this marker, see The @DoNotInherit and @ApiMayChange markers in the Akka documentation.

下面示例使用实验特性,它的 API 在未来 Akka HTTP 发布时随时变化。 有关此标记的更多信息,见 Akka 文档里的 @DoNotInherit 和 @ApiMayChange 标记

To help start a server Akka HTTP provides an experimental helper class called HttpAppHttpApp. This is the same example as before rewritten using HttpAppHttpApp:

为了帮助启动一个服务,Akka HTTP 提供一个实验性辅助类 HttpAppHttpApp 。 这是使用 HttpAppHttpApp 重写之前的相同示例:

Scala
import akka.http.scaladsl.model.{ ContentTypes, HttpEntity }
import akka.http.scaladsl.server.HttpApp
import akka.http.scaladsl.server.Route

// Server definition
object WebServer extends HttpApp {
  override def routes: Route =
    path("hello") {
      get {
        complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
      }
    }
}

// Starting the server
WebServer.startServer("localhost", 8080)
Java

// Server definition class MinimalHttpApp extends HttpApp { @Override protected Route routes() { return path("hello", () -> get(() -> complete("<h1>Say hello to akka-http</h1>") ) ); } } // Starting the server final MinimalHttpApp myServer = new MinimalHttpApp(); myServer.startServer("localhost", 8080);

See HttpApp Bootstrap for more details about setting up a server using this approach.

有关使用这个方式设置服务器的更多详细信息,见 HttpApp 引导程序

Longer Example

较长的示例

The following is an Akka HTTP route definition that tries to show off a few features. The resulting service does not really do anything useful but its definition should give you a feel for what an actual API definition with the Routing DSL will look like:

下面是一个 Akka HTTP 路由定义,试图展示一些功能。服务实际上没有做任何有用的事,但是,它的定义应该让你了解了路由 DSL 的实际 API 定义看起来像:

import akka.actor.{ ActorRef, ActorSystem }
import akka.http.scaladsl.coding.Deflate
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import akka.http.scaladsl.model.StatusCodes.MovedPermanently
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout

// types used by the API routes
type Money = Double // only for demo purposes, don't try this at home!
type TransactionResult = String
case class User(name: String)
case class Order(email: String, amount: Money)
case class Update(order: Order)
case class OrderItem(i: Int, os: Option[String], s: String)

// marshalling would usually be derived automatically using libraries
implicit val orderUM: FromRequestUnmarshaller[Order] = ???
implicit val orderM: ToResponseMarshaller[Order] = ???
implicit val orderSeqM: ToResponseMarshaller[Seq[Order]] = ???
implicit val timeout: Timeout = ??? // for actor asks
implicit val ec: ExecutionContext = ???
implicit val mat: ActorMaterializer = ???
implicit val sys: ActorSystem = ???

// backend entry points
def myAuthenticator: Authenticator[User] = ???
def retrieveOrdersFromDB: Seq[Order] = ???
def myDbActor: ActorRef = ???
def processOrderRequest(id: Int, complete: Order => Unit): Unit = ???

val route = concat(
  path("orders") {
    authenticateBasic(realm = "admin area", myAuthenticator) { user =>
      concat(
        get {
          encodeResponseWith(Deflate) {
            complete {
              // marshal custom object with in-scope marshaller
              retrieveOrdersFromDB
            }
          }
        },
        post {
          // decompress gzipped or deflated requests if required
          decodeRequest {
            // unmarshal with in-scope unmarshaller
            entity(as[Order]) { order =>
              complete {
                // ... write order to DB
                "Order received"
              }
            }
          }
        })
    }
  },
  // extract URI path element as Int
  pathPrefix("order" / IntNumber) { orderId =>
    concat(
      pathEnd {
        concat(
          (put | parameter('method ! "put")) {
            // form extraction from multipart or www-url-encoded forms
            formFields(('email, 'total.as[Money])).as(Order) { order =>
              complete {
                // complete with serialized Future result
                (myDbActor ? Update(order)).mapTo[TransactionResult]
              }
            }
          },
          get {
            // debugging helper
            logRequest("GET-ORDER") {
              // use in-scope marshaller to create completer function
              completeWith(instanceOf[Order]) { completer =>
                // custom
                processOrderRequest(orderId, completer)
              }
            }
          })
      },
      path("items") {
        get {
          // parameters to case class extraction
          parameters(('size.as[Int], 'color ?, 'dangerous ? "no"))
            .as(OrderItem) { orderItem =>
              // ... route using case class instance created from
              // required and optional query parameters
            }
        }
      })
  },
  pathPrefix("documentation") {
    // optionally compresses the response with Gzip or Deflate
    // if the client accepts compressed responses
    encodeResponse {
      // serve up static content from a JAR resource
      getFromResourceDirectory("docs")
    }
  },
  path("oldApi" / Remaining) { pathRest =>
    redirect("http://oldapi.example.com/" + pathRest, MovedPermanently)
  }
)

Interaction with Akka Typed

与 Akka Typed 的交互

Since Akka version 2.5.22, Akka typed became ready for production, Akka HTTP, however, is still using the untyped ActorSystem. This following example will demonstrate how to use Akka HTTP and Akka Typed together within the same application.

从 Akka 2.5.22 版本开始,Akka typed 可用于生产,但是,Akka HTTP 将继续使用无类型的 ActorSystem。 下面的例子将演示在同一个程序里怎样一起使用 Akka HTTP 和 Akka Typed。

We will create a small web server responsible to record build jobs with its state and duration, query jobs by id and status, and clear the job history.

我们将创建一个小型 Web 服务器,服务器负责记录使用状态和持续时间的构建作业、通过 id 和状态查询作业、和清理作业历史。

First let’s start by defining the Behavior that will act as a repository for the build job information, this isn’t strictly needed for our sample but just to have an actual actor to interact with:

首先让我们开始于通过定义的 Behavior,它将作为构建作业信息的存储库,对于我们的示例这不是必须的,仅仅是为了有一个实际的 actor 与之交互:

Scala
import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.Behaviors

object JobRepository {

  // Definition of the a build job and its possible status values
  sealed trait Status
  object Successful extends Status
  object Failed extends Status

  final case class Job(id: Long, projectName: String, status: Status, duration: Long)
  final case class Jobs(jobs: Seq[Job])

  // Trait defining successful and failure responses
  sealed trait Response
  case object OK extends Response
  final case class KO(reason: String) extends Response

  // Trait and its implementations representing all possible messages that can be sent to this Behavior
  sealed trait Command
  final case class AddJob(job: Job, replyTo: ActorRef[Response]) extends Command
  final case class GetJobById(id: Long, replyTo: ActorRef[Option[Job]]) extends Command
  final case class GetJobByStatus(status: Status, replyTo: ActorRef[Seq[Job]]) extends Command
  final case class ClearJobs(replyTo: ActorRef[Response]) extends Command

  // This behavior handles all possible incoming messages and keeps the state in the function parameter
  def apply(jobs: Map[Long, Job] = Map.empty): Behavior[Command] = Behaviors.receiveMessage {
    case AddJob(job, replyTo) if jobs.contains(job.id) =>
      replyTo ! KO("Job already exists")
      Behaviors.same
    case AddJob(job, replyTo) =>
      replyTo ! OK
      JobRepository(jobs.+(job.id -> job))
    case GetJobById(id, replyTo) =>
      replyTo ! jobs.get(id)
      Behaviors.same
    case ClearJobs(replyTo) =>
      replyTo ! OK
      JobRepository(Map.empty)
  }

}

Then, let’s define the JSON marshaller and unmarshallers for the HTTP routes:

然后,让我们定义 HTTP 路由的 JSON 编组和解组:

Scala
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol
import spray.json.DeserializationException
import spray.json.JsString
import spray.json.JsValue
import spray.json.RootJsonFormat

trait JsonSupport extends SprayJsonSupport {
  // import the default encoders for primitive types (Int, String, Lists etc)
  import DefaultJsonProtocol._
  import JobRepository._

  implicit object StatusFormat extends RootJsonFormat[Status] {
    def write(status: Status): JsValue = status match {
      case Failed     => JsString("Failed")
      case Successful => JsString("Successful")
    }

    def read(json: JsValue): Status = json match {
      case JsString("Failed")     => Failed
      case JsString("Successful") => Successful
      case _                      => throw new DeserializationException("Status unexpected")
    }
  }

  implicit val jobFormat = jsonFormat4(Job)

  implicit val jobsFormat = jsonFormat1(Jobs)
}

Next step is to define the RouteRoute that will communicate with the previously defined behavior and handle all its possible responses:

下一步是定义 RouteRoute,它将与之前定义的行为通信并处理所有可能的响应:

Scala
import akka.actor.typed.ActorSystem
import akka.util.Timeout

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Route

import scala.concurrent.duration._
import scala.concurrent.Future

class JobRoutes(buildJobRepository: ActorRef[JobRepository.Command])(implicit system: ActorSystem[_]) extends JsonSupport {

  import akka.actor.typed.scaladsl.AskPattern._

  // asking someone requires a timeout and a scheduler, if the timeout hits without response
  // the ask is failed with a TimeoutException
  implicit val timeout: Timeout = 3.seconds
  // implicit scheduler only needed in 2.5
  // in 2.6 having an implicit typed ActorSystem in scope is enough
  implicit val scheduler = system.scheduler

  lazy val theJobRoutes: Route =
    pathPrefix("jobs") {
      concat(
        pathEnd {
          concat(
            post {
              entity(as[JobRepository.Job]) { job =>
                val operationPerformed: Future[JobRepository.Response] =
                  buildJobRepository.ask(JobRepository.AddJob(job, _))
                onSuccess(operationPerformed) {
                  case JobRepository.OK         => complete("Job added")
                  case JobRepository.KO(reason) => complete(StatusCodes.InternalServerError -> reason)
                }
              }
            },
            delete {
              val operationPerformed: Future[JobRepository.Response] =
                buildJobRepository.ask(JobRepository.ClearJobs(_))
              onSuccess(operationPerformed) {
                case JobRepository.OK         => complete("Jobs cleared")
                case JobRepository.KO(reason) => complete(StatusCodes.InternalServerError -> reason)
              }
            }
          )
        },
        (get & path(LongNumber)) { id =>
          val maybeJob: Future[Option[JobRepository.Job]] =
            buildJobRepository.ask(JobRepository.GetJobById(id, _))
          rejectEmptyResponse {
            complete(maybeJob)
          }
        }
      )
    }
}

Finally, we create a Behavior that bootstraps the web server and use it as the root behavior of our actor system:

最后,我们创建一个 Behavior 来引导 Web 服务器,并使用它作为我们 actor 系统的根行为:

Scala
import akka.actor.typed.PostStop
import akka.actor.typed.scaladsl.adapter._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.Http

import scala.concurrent.ExecutionContextExecutor
import scala.util.{ Success, Failure }

object Server {

  sealed trait Message
  private final case class StartFailed(cause: Throwable) extends Message
  private final case class Started(binding: ServerBinding) extends Message
  case object Stop extends Message

  def apply(host: String, port: Int): Behavior[Message] = Behaviors.setup { ctx =>

    implicit val system = ctx.system
    // http doesn't know about akka typed so provide untyped system
    implicit val untypedSystem: akka.actor.ActorSystem = ctx.system.toClassic
    // implicit materializer only required in Akka 2.5
    // in 2.6 having an implicit classic or typed ActorSystem in scope is enough
    implicit val materializer: ActorMaterializer = ActorMaterializer()(ctx.system.toClassic)
    implicit val ec: ExecutionContextExecutor = ctx.system.executionContext

    val buildJobRepository = ctx.spawn(JobRepository(), "JobRepository")
    val routes = new JobRoutes(buildJobRepository)

    val serverBinding: Future[Http.ServerBinding] =
      Http.apply().bindAndHandle(routes.theJobRoutes, host, port)
    ctx.pipeToSelf(serverBinding) {
      case Success(binding) => Started(binding)
      case Failure(ex)      => StartFailed(ex)
    }

    def running(binding: ServerBinding): Behavior[Message] =
      Behaviors.receiveMessagePartial[Message] {
        case Stop =>
          ctx.log.info(
            "Stopping server http://{}:{}/",
            binding.localAddress.getHostString,
            binding.localAddress.getPort)
          Behaviors.stopped
      }.receiveSignal {
        case (_, PostStop) =>
          binding.unbind()
          Behaviors.same
      }

    def starting(wasStopped: Boolean): Behaviors.Receive[Message] =
      Behaviors.receiveMessage[Message] {
        case StartFailed(cause) =>
          throw new RuntimeException("Server failed to start", cause)
        case Started(binding) =>
          ctx.log.info(
            "Server online at http://{}:{}/",
            binding.localAddress.getHostString,
            binding.localAddress.getPort)
          if (wasStopped) ctx.self ! Stop
          running(binding)
        case Stop =>
          // we got a stop message but haven't completed starting yet,
          // we cannot stop until starting has completed
          starting(wasStopped = true)
      }

    starting(wasStopped = false)
  }
}

def main(args: Array[String]) {
  val system: ActorSystem[Server.Message] =
    ActorSystem(Server("localhost", 8080), "BuildJobsServer")
}

Note that the akka.actor.typed.ActorSystem is converted with toClassic, which comes from import akka.actor.typed.scaladsl.adapter._. If you are using an earlier version than Akka 2.5.26 this conversion method is named toUntyped.

注意,akka.actor.typed.ActorSystem 是用 toClassic 转换,它来自 import akka.actor.typed.scaladsl.adapter._。 如果你使用比 Akka 2.5.26 更早的版本,则这个转换方法称为 toUntyped

Dynamic Routing Example

动态路由示例

As the routes are evaluated for each request, it is possible to make changes at runtime. Please note that every access may happen on a separated thread, so any shared mutable state must be thread safe.

在为每个请求评估路由时,可以在运行时进行改变。请注意,因为每次访问都可能发生在单独的线程,所以任何共享可变状态都必须是线程安全的。

The following is an Akka HTTP route definition that allows dynamically adding new or updating mock endpoints with associated request-response pairs at runtime.

下面是一个 Akka HTTP 路由定义,它允许在运行时动态添加带有关联请求-响应对的新的或更新模拟端点。

Scala
case class MockDefinition(path: String, requests: Seq[JsValue], responses: Seq[JsValue])
implicit val format = jsonFormat3(MockDefinition)

@volatile var state = Map.empty[String, Map[JsValue, JsValue]]

// fixed route to update state
val fixedRoute: Route = post {
  pathSingleSlash {
    entity(as[MockDefinition]) { mock =>
      val mapping = mock.requests.zip(mock.responses).toMap
      state = state + (mock.path -> mapping)
      complete("ok")
    }
  }
}

// dynamic routing based on current state
val dynamicRoute: Route = ctx => {
  val routes = state.map {
    case (segment, responses) =>
      post {
        path(segment) {
          entity(as[JsValue]) { input =>
            complete(responses.get(input))
          }
        }
      }
  }
  concat(routes.toList: _*)(ctx)
}

val route = fixedRoute ~ dynamicRoute
Java
final private Map<String, Map<JsonNode, JsonNode>> state = new ConcurrentHashMap<>();

private Route createRoute() {
  // fixed route to update state
  Route fixedRoute = post(() ->
    pathSingleSlash(() ->
      entity(Jackson.unmarshaller(MockDefinition.class), mock -> {
        Map<JsonNode, JsonNode> mappings = new HashMap<>();
        int size = Math.min(mock.getRequests().size(), mock.getResponses().size());
        for (int i = 0; i < size; i++) {
          mappings.put(mock.getRequests().get(i), mock.getResponses().get(i));
        }
        state.put(mock.getPath(), mappings);
        return complete("ok");
      })
    )
  );

  // dynamic routing based on current state
  Route dynamicRoute = post(() ->
    state.entrySet().stream().map(mock ->
      path(mock.getKey(), () ->
        entity(Jackson.unmarshaller(JsonNode.class), input ->
          complete(StatusCodes.OK, mock.getValue().get(input), Jackson.marshaller())
        )
      )
    ).reduce(reject(), Route::orElse)
  );

  return concat(fixedRoute, dynamicRoute);
}

private static class MockDefinition {
  private final String path;
  private final List<JsonNode> requests;
  private final List<JsonNode> responses;

  public MockDefinition(@JsonProperty("path") String path,
                        @JsonProperty("requests") List<JsonNode> requests,
                        @JsonProperty("responses") List<JsonNode> responses) {
    this.path = path;
    this.requests = requests;
    this.responses = responses;
  }

  public String getPath() {
    return path;
  }

  public List<JsonNode> getRequests() {
    return requests;
  }

  public List<JsonNode> getResponses() {
    return responses;
  }
}

For example, let’s say we do a POST request with body:

例如,我们对正文执行一个 POST 请求。

{
  "path": "test",
  "requests": [
    {"id": 1},
    {"id": 2}
  ],
  "responses": [
    {"amount": 1000},
    {"amount": 2000}
  ]
}

Subsequent POST request to /test with body {"id": 1} will be responded with {"amount": 1000}.

对正文 {"id": 1}/test 的后续 POST 请求将以 {"amount": 1000] 响应。

Handling HTTP Server failures in the High-Level API

在高级 API 里处理 HTTP 服务器失败

There are various situations when failure may occur while initialising or running an Akka HTTP server. Akka by default will log all these failures, however sometimes one may want to react to failures in addition to them just being logged, for example by shutting down the actor system, or notifying some external monitoring end-point explicitly.

当初始化和运行 Akka HTTP 服务器时,各种失败都可能发行。Akka 默认将记录所有失败,但是,有时也想在记录日志这外对失败作出反应, 比如关闭 actor 系统,或者显示的通知外部监视器端点。

Bind failures

For example the server might be unable to bind to the given port. For example when the port is already taken by another application, or if the port is privileged (i.e. only usable by root). In this case the “binding future” will fail immediately, and we can react to it by listening on the FutureCompletionStage’s completion:

例如服务器可能无法绑定以指定的端口。比如当端口已经用于另一个应用程序,或者端口是特权端口(例如:只能 root 使用)。 这种情况下,“绑定 Future”会立即失败,我们可以通过监听 FutureCompletionStage 的完成作出反应:

Scala
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer

import scala.concurrent.Future

object WebServer {
  def main(args: Array[String]) {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future foreach in the end
    implicit val executionContext = system.dispatcher

    val handler = get {
      complete("Hello world!")
    }

    // let's say the OS won't allow us to bind to 80.
    val (host, port) = ("localhost", 80)
    val bindingFuture: Future[ServerBinding] =
      Http().bindAndHandle(handler, host, port)

    bindingFuture.failed.foreach { ex =>
      log.error(ex, "Failed to bind to {}:{}!", host, port)
    }
  }
}
Java

import akka.NotUsed; import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.server.Route; import akka.http.javadsl.Http; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Flow; import java.io.IOException; import java.util.concurrent.CompletionStage; public class HighLevelServerBindFailureExample { public static void main(String[] args) throws IOException { // boot up server using the route as defined below final ActorSystem system = ActorSystem.create(); final ActorMaterializer materializer = ActorMaterializer.create(system); final HighLevelServerExample app = new HighLevelServerExample(); final Route route = app.createRoute(); final Flow<HttpRequest, HttpResponse, NotUsed> handler = route.flow(system, materializer); final CompletionStage<ServerBinding> binding = Http.get(system).bindAndHandle(handler, ConnectHttp.toHost("127.0.0.1", 8080), materializer); binding.exceptionally(failure -> { System.err.println("Something very bad happened! " + failure.getMessage()); system.terminate(); return null; }); system.terminate(); } }
Note

For a more low-level overview of the kinds of failures that can happen and also more fine-grained control over them refer to the Handling HTTP Server failures in the Low-Level API documentation.

有关可能发生失败的更低级概述,以及对它们的更精细控制,请参考 在低级 API 里处理 HTTP 服务器错误 文档。

Failures and exceptions inside the Routing DSL

路由 DSL 内部的失败和异常

Exception handling within the Routing DSL is done by providing ExceptionHandlerExceptionHandler s which are documented in-depth in the Exception Handling section of the documentation. You can use them to transform exceptions into HttpResponseHttpResponse s with appropriate error codes and human-readable failure descriptions.

路由 DSL 中的异常处理是通过提供 ExceptionHandlerExceptionHandler 来完成的,在文档的 Exception Handling 部分对此进入了深入介绍。 你可以使用 ExceptionHandlerExceptionHandler 以合适的错误代码和人类可读的失败描述来转换异常为 HttpResponseHttpResponse

File uploads

文件上传

For high level directives to handle uploads see the FileUploadDirectives.

用于处理上传的高级指令见 FileUploadDirectives

Handling a simple file upload from for example a browser form with a file input can be done by accepting a Multipart.FormData entity, note that the body parts are Source rather than all available right away, and so is the individual body part payload so you will need to consume those streams both for the file and for the form fields.

处理简单文件上传(form)表单,例如带有一个 file 输入框的浏览器(form)表单可以通过接受 Multipart.FormData 实体来完成。 注意,正文部分是 Source 而不是立即可用,因此,单个正文部分载荷也是如此,所以你需要同时对文件和表单字段消费这些流。

Here is a simple example which just dumps the uploaded file into a temporary file on disk, collects some form fields and saves an entry to a fictive database:

这里是一个简单的示例,只是转储上传文件到磁盘上的临时文件里面,收集一些表单字段,并保存条目到一个虚构的数据库:

Scala
val uploadVideo =
  path("video") {
    entity(as[Multipart.FormData]) { formData =>

      // collect all parts of the multipart as it arrives into a map
      val allPartsF: Future[Map[String, Any]] = formData.parts.mapAsync[(String, Any)](1) {

        case b: BodyPart if b.name == "file" =>
          // stream into a file as the chunks of it arrives and return a future
          // file to where it got stored
          val file = File.createTempFile("upload", "tmp")
          b.entity.dataBytes.runWith(FileIO.toPath(file.toPath)).map(_ =>
            (b.name -> file))

        case b: BodyPart =>
          // collect form field values
          b.toStrict(2.seconds).map(strict =>
            (b.name -> strict.entity.data.utf8String))

      }.runFold(Map.empty[String, Any])((map, tuple) => map + tuple)

      val done = allPartsF.map { allParts =>
        // You would have some better validation/unmarshalling here
        db.create(Video(
          file = allParts("file").asInstanceOf[File],
          title = allParts("title").asInstanceOf[String],
          author = allParts("author").asInstanceOf[String]))
      }

      // when processing have finished create a response for the user
      onSuccess(allPartsF) { allParts =>
        complete {
          "ok!"
        }
      }
    }
  }
Java
import static akka.http.javadsl.server.Directives.complete;
import static akka.http.javadsl.server.Directives.entity;
import static akka.http.javadsl.server.Directives.onSuccess;
import static akka.http.javadsl.server.Directives.path;

  path("video", () ->
  entity(Unmarshaller.entityToMultipartFormData(), formData -> {
    // collect all parts of the multipart as it arrives into a map
    final CompletionStage<Map<String, Object>> allParts =
      formData.getParts().mapAsync(1, bodyPart -> {
        if ("file".equals(bodyPart.getName())) {
          // stream into a file as the chunks of it arrives and return a CompletionStage
          // file to where it got stored
          final File file = File.createTempFile("upload", "tmp");
          return bodyPart.getEntity().getDataBytes()
            .runWith(FileIO.toPath(file.toPath()), materializer)
            .thenApply(ignore ->
              new Pair<String, Object>(bodyPart.getName(), file)
            );
        } else {
          // collect form field values
          return bodyPart.toStrict(2 * 1000, materializer)
            .thenApply(strict ->
              new Pair<String, Object>(bodyPart.getName(),
                strict.getEntity().getData().utf8String())
            );
        }
      }).runFold(new HashMap<String, Object>(), (acc, pair) -> {
        acc.put(pair.first(), pair.second());
        return acc;
      }, materializer);

    // simulate a DB call
    final CompletionStage<Void> done = allParts.thenCompose(map ->
      // You would have some better validation/unmarshalling here
      DB.create((File) map.get("file"),
        (String) map.get("title"),
        (String) map.get("author")
      ));

    // when processing have finished create a response for the user
    return onSuccess(allParts, x -> complete("ok!"));
  })
);

You can transform the uploaded files as they arrive rather than storing them in a temporary file as in the previous example. In this example we accept any number of .csv files, parse those into lines and split each line before we send it to an actor for further processing:

你可以在上传文件到达时转换,而不是上前面例子里存储到一个临时文件里。在这个例子里,我们接受任意数量的 .csv 文件, 将其按行解析并分割每一行,然后发送到 actor 里进行进一步处理。

Scala
val splitLines = Framing.delimiter(ByteString("\n"), 256)

val csvUploads =
  path("metadata" / LongNumber) { id =>
    entity(as[Multipart.FormData]) { formData =>
      val done: Future[Done] = formData.parts.mapAsync(1) {
        case b: BodyPart if b.filename.exists(_.endsWith(".csv")) =>
          b.entity.dataBytes
            .via(splitLines)
            .map(_.utf8String.split(",").toVector)
            .runForeach(csv =>
              metadataActor ! MetadataActor.Entry(id, csv))
        case _ => Future.successful(Done)
      }.runWith(Sink.ignore)

      // when processing have finished create a response for the user
      onSuccess(done) { _ =>
        complete {
          "ok!"
        }
      }
    }
  }
Java
import static akka.http.javadsl.server.Directives.complete;
import static akka.http.javadsl.server.Directives.entity;
import static akka.http.javadsl.server.Directives.onComplete;
import static akka.http.javadsl.server.Directives.path;

Route csvUploads() {
  final Flow<ByteString, ByteString, NotUsed> splitLines =
    Framing.delimiter(ByteString.fromString("\n"), 256);

  return path(segment("metadata").slash(longSegment()), id ->
    entity(Unmarshaller.entityToMultipartFormData(), formData -> {

      final CompletionStage<Done> done = formData.getParts().mapAsync(1, bodyPart ->
        bodyPart.getFilename().filter(name -> name.endsWith(".csv")).map(ignored ->
          bodyPart.getEntity().getDataBytes()
            .via(splitLines)
            .map(bs -> bs.utf8String().split(","))
            .runForeach(csv ->
                metadataActor.tell(new Entry(id, csv), ActorRef.noSender()),
              materializer)
        ).orElseGet(() ->
          // in case the uploaded file is not a CSV
          CompletableFuture.completedFuture(Done.getInstance()))
      ).runWith(Sink.ignore(), materializer);

      // when processing have finished create a response for the user
      return onComplete(() -> done, ignored -> complete("ok!"));
    })
  );
}

Configuring Server-side HTTPS

配置服务器端 HTTPS

For detailed documentation about configuring and using HTTPS on the server-side refer to Server-Side HTTPS Support.

有关在服务器端配置和使用 HTTPS 的详细文档,参考 服务器端 HTTPS 支持

在此文档中发现错误?该页面的源代码可以在 这里 找到。欢迎随时编辑并提交 Pull Request。