TwoThreeShape
一個簡單的示例,說明如何定義具有 2 個入口和 3 個出口的自定義形狀。
case class TwoThreeShape[-In1, -In2, +Out1, +Out2, +Out3](
in1: Inlet[In1@uncheckedVariance],
in2: Inlet[In2@uncheckedVariance],
out1: Outlet[Out1@uncheckedVariance],
out2: Outlet[Out2@uncheckedVariance],
out3: Outlet[Out3@uncheckedVariance]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = List(in1, in2)
override val outlets: immutable.Seq[Outlet[_]] = List(out1, out2, out3)
override def deepCopy(): TwoThreeShape[In1, In2, Out1, Out2, Out3] =
TwoThreeShape(in1.carbonCopy(),
in2.carbonCopy(),
out1.carbonCopy(),
out2.carbonCopy(),
out3.carbonCopy())
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {
require(inlets.size == 2, s"proposed inlets [${inlets.mkString(", ")}] do not fit TwoThreeShape")
require(outlets.size == 3, s"proposed outlets [${outlets.mkString(", ")}] do not fit TwoThreeShape")
TwoThreeShape(inlets(0), inlets(1), outlets(0), outlets(1), outlets(2))
}
}
這種奇怪形狀的一個示例用法:一個階段,它將傳遞 2 個流的元素,同時保持流中傳遞的元素數量的比率:
def ratioCount[X,Y]: Graph[TwoThreeShape[X,Y,X,Y,(Int,Int)],NotUsed] = {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val x = b.add(Broadcast[X](2))
val y = b.add(Broadcast[Y](2))
val z = b.add(Zip[Int,Int])
x.out(1).conflateWithSeed(_ => 1)((count,_) => count + 1) ~> z.in0
y.out(1).conflateWithSeed(_ => 1)((count,_) => count + 1) ~> z.in1
TwoThreeShape(x.in,y.in,x.out(0),y.out(0),z.out)
}
}