TwoThreeShape
一个简单的示例,说明如何定义具有 2 个入口和 3 个出口的自定义形状。
case class TwoThreeShape[-In1, -In2, +Out1, +Out2, +Out3](
in1: Inlet[In1@uncheckedVariance],
in2: Inlet[In2@uncheckedVariance],
out1: Outlet[Out1@uncheckedVariance],
out2: Outlet[Out2@uncheckedVariance],
out3: Outlet[Out3@uncheckedVariance]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = List(in1, in2)
override val outlets: immutable.Seq[Outlet[_]] = List(out1, out2, out3)
override def deepCopy(): TwoThreeShape[In1, In2, Out1, Out2, Out3] =
TwoThreeShape(in1.carbonCopy(),
in2.carbonCopy(),
out1.carbonCopy(),
out2.carbonCopy(),
out3.carbonCopy())
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {
require(inlets.size == 2, s"proposed inlets [${inlets.mkString(", ")}] do not fit TwoThreeShape")
require(outlets.size == 3, s"proposed outlets [${outlets.mkString(", ")}] do not fit TwoThreeShape")
TwoThreeShape(inlets(0), inlets(1), outlets(0), outlets(1), outlets(2))
}
}
这种奇怪形状的一个示例用法:一个阶段,它将传递 2 个流的元素,同时保持流中传递的元素数量的比率:
def ratioCount[X,Y]: Graph[TwoThreeShape[X,Y,X,Y,(Int,Int)],NotUsed] = {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val x = b.add(Broadcast[X](2))
val y = b.add(Broadcast[Y](2))
val z = b.add(Zip[Int,Int])
x.out(1).conflateWithSeed(_ => 1)((count,_) => count + 1) ~> z.in0
y.out(1).conflateWithSeed(_ => 1)((count,_) => count + 1) ~> z.in1
TwoThreeShape(x.in,y.in,x.out(0),y.out(0),z.out)
}
}