Akka Streams Hello World
Akka Streams 允許你輕鬆建立利用 Akka 框架功能的流,而無需明確定義 actor 行為和訊息。每個流將至少有一個 Source
(資料的來源)和至少一個 Sink
(資料的目的地)。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import java.io.File
val stream = Source(Seq("test1.txt", "test2.txt", "test3.txt"))
.map(new File(_))
.filter(_.exists())
.filter(_.length() != 0)
.to(Sink.foreach(f => println(s"Absolute path: ${f.getAbsolutePath}")))
在這個快速示例中,我們有一個我們輸入到流中的檔名。首先我們將它們對映到 File
,然後我們過濾掉不存在的檔案,然後是長度為 0 的檔案。如果檔案通過過濾器,它將列印到 stdout
。
Akka 流還允許你以模組化方式執行流。你可以使用流的部分模組建立 Flow
s。如果我們採用相同的例子,我們也可以這樣做:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import java.io.File
implicit val actorSystem = ActorSystem("system")
implicit val actorMaterializer = ActorMaterializer()
val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
val mapper = Flow[String].map(new File(_))
val existsFilter = Flow[File].filter(_.exists())
val lengthZeroFilter = Flow[File].filter(_.length() != 0)
val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))
val stream = source
.via(mapper)
.via(existsFilter)
.via(lengthZeroFilter)
.to(sink)
stream.run()
在第二個版本中,我們可以看到 mapper
,existsFilter
,lengthZeroFilter
是 Flow
s。你可以使用 via
方法在流中組合它們。此功能允許你重用你的程式碼片段。值得一提的是,Flow
s 可以是無國籍的或有狀態的。在有狀態的情況下,重用它們時需要小心。
你也可以將流視為 Graphs
。Akka Streams 還提供了一個強大的功能,以簡單的方式定義複雜的流。我們可以做同樣的例子:
import java.io.File
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
implicit val actorSystem = ActorSystem("system")
implicit val actorMaterializer = ActorMaterializer()
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
val mapper = Flow[String].map(new File(_))
val existsFilter = Flow[File].filter(_.exists())
val lengthZeroFilter = Flow[File].filter(_.length() != 0)
val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))
source ~> mapper ~> existsFilter ~> lengthZeroFilter ~> sink
ClosedShape
})
graph.run()
也可以使用 GraphDSL
建立聚合流。例如,如果我們想將 mapper 和兩個過濾器組合在一起,我們可以做到:
val combinedFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val mapper = builder.add(Flow[String].map(new File(_)))
val existsFilter = builder.add(Flow[File].filter(_.exists()))
val lengthZeroFilter = builder.add(Flow[File].filter(_.length() != 0))
mapper ~> existsFilter ~> lengthZeroFilter
FlowShape(mapper.in, lengthZeroFilter.out)
})
然後將其用作單個塊。combinedFlow
將是 FlowShape
或 PartialGraph
。我們可以用 via
為例:
val stream = source
.via(combinedFlow)
.to(sink)
stream.run()
或者使用 GraphDSL
:
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))
source ~> combinedFlow ~> sink
ClosedShape
})
graph.run()