Akka Streams Hello World
Akka Streams 允许你轻松创建利用 Akka 框架功能的流,而无需明确定义 actor 行为和消息。每个流将至少有一个 Source
(数据的来源)和至少一个 Sink
(数据的目的地)。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import java.io.File
val stream = Source(Seq("test1.txt", "test2.txt", "test3.txt"))
.map(new File(_))
.filter(_.exists())
.filter(_.length() != 0)
.to(Sink.foreach(f => println(s"Absolute path: ${f.getAbsolutePath}")))
在这个快速示例中,我们有一个我们输入到流中的文件名。首先我们将它们映射到 File
,然后我们过滤掉不存在的文件,然后是长度为 0 的文件。如果文件通过过滤器,它将打印到 stdout
。
Akka 流还允许你以模块化方式执行流。你可以使用流的部分模块创建 Flow
s。如果我们采用相同的例子,我们也可以这样做:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import java.io.File
implicit val actorSystem = ActorSystem("system")
implicit val actorMaterializer = ActorMaterializer()
val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
val mapper = Flow[String].map(new File(_))
val existsFilter = Flow[File].filter(_.exists())
val lengthZeroFilter = Flow[File].filter(_.length() != 0)
val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))
val stream = source
.via(mapper)
.via(existsFilter)
.via(lengthZeroFilter)
.to(sink)
stream.run()
在第二个版本中,我们可以看到 mapper
,existsFilter
,lengthZeroFilter
是 Flow
s。你可以使用 via
方法在流中组合它们。此功能允许你重用你的代码片段。值得一提的是,Flow
s 可以是无国籍的或有状态的。在有状态的情况下,重用它们时需要小心。
你也可以将流视为 Graphs
。Akka Streams 还提供了一个强大的功能,以简单的方式定义复杂的流。我们可以做同样的例子:
import java.io.File
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
implicit val actorSystem = ActorSystem("system")
implicit val actorMaterializer = ActorMaterializer()
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
val mapper = Flow[String].map(new File(_))
val existsFilter = Flow[File].filter(_.exists())
val lengthZeroFilter = Flow[File].filter(_.length() != 0)
val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))
source ~> mapper ~> existsFilter ~> lengthZeroFilter ~> sink
ClosedShape
})
graph.run()
也可以使用 GraphDSL
创建聚合流。例如,如果我们想将 mapper 和两个过滤器组合在一起,我们可以做到:
val combinedFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val mapper = builder.add(Flow[String].map(new File(_)))
val existsFilter = builder.add(Flow[File].filter(_.exists()))
val lengthZeroFilter = builder.add(Flow[File].filter(_.length() != 0))
mapper ~> existsFilter ~> lengthZeroFilter
FlowShape(mapper.in, lengthZeroFilter.out)
})
然后将其用作单个块。combinedFlow
将是 FlowShape
或 PartialGraph
。我们可以用 via
为例:
val stream = source
.via(combinedFlow)
.to(sink)
stream.run()
或者使用 GraphDSL
:
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))
source ~> combinedFlow ~> sink
ClosedShape
})
graph.run()