Akka Streams 提供了比较丰富的 Shpae 形状构件来连接数据处理组件。其组合方法比较类似,都是通过创建不同形状的构件之后再按需要连接输入输出端口进行组合。如果已有的 Shape 形状不能满足要求,Graph DSL 允许自定义 Shape 来任意组合组件。同 Flow DSL 一样,Graph DSL 也提供机制指定要保留的实体化值。

1 Shape

Akka Streams 的 Shpae 形状用于定义连接的输入输出端口。从 Shape 文档 可以看出只是包含了 Seq[Inlet[_]] 一组入口和 Seq[OutLet[_]] 一组出口。其本身没有什么处理逻辑,需要在内部通过其它构件提供功能,对外则给出了定义的一组入口和一组出口。所有的 Graph 图都包含了一个 Shape 对象,用于指定其在流中的连接方式。比起 Shape, Graph 还包含了很多属性,比如异步边界属性,缓存属性以及日志属性等。下面是 Akka Streams 提供的形状及其对应的 Graph 组件:

  • ClosedShape: 无输入,无输出。对应一个 RunnableGraph 图。
  • SourceShape[T]: 无输入,单输出。对应一个 Source 图。
  • FlowShape[I, O]: 单输入,单输出。 对应一个 Flow 图。
  • SinkShape[T]: 单输入,无输出。 对应一个 Sink 图。
  • UniformFanOutShape[I, O]: 单输入,多输出(同类型)。 对应的图有 Broadcast, Partition, Balance
  • UniformFanInShape[I, O]: 多输入(同类型),单输出。对应的图有:Merge, MergePreferred MergePrioritiezed, InterLeave, Concat 等。
  • FanInShape2, FanInShape3,,,: 多输入(多类型),单输出。对应的图有 Zip
  • FanOutShape2FanOutShape3,,,: 单输入,多输出(多类型)。对应的图有 Unzip
  • BidiShape: 双输入(不同类型),双输出(不同类型)。对应的图有 BidiFlow

具有 SourceShape[T], FlowShape[I, O]SinkShape[T] 形状的图可以通过 Flow DSL 的线性组合方式 (viaMat or toMat) 来使用。如果一个 Graph (已有的或应用定制的) 有多个的输入或输出,除了一些特例,通常无法通过 Flow DSL 的线性组合方式来使用。有多个输入或多个输出的图很多时候需要使用 Graph DSL 通过非线性组合方式生成 RunnableGraph ,然后实体化运行。这也是我们区分这二种 DSL 的初衷。

用 Flow DSL 处理多输入或多输出的特例情况是可以进行简单组合的一些场景,其本质还是 Graph DSL 一些常用模式的的简化。比如 Source.combine 方法的文档 给出了下面的例子:

1
2
3
4
5
val source1 = Source(1 to 3)
val source2 = Source(8 to 10)
val source3 = Source(12 to 14)
val combined = Source.combine(source1, source2, source3)(Merge(_))
combined.runForeach(println)

该方法的签名为

1
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]

可以看到,上面方法的第二个参数都是 strategy: Int => Graph[UniformFanInShape[T, U], NotUsed],即用到一个多输入单输出的图。可以使用的构件包括 Merge, Concat, ZipN, ZipWithN 等。除了 ZipWithN 可以带一个函数对象,其它的都是简单的合并或拉链操作。

2 定制 Shape

可以看到系统提供的 Shape,除了 ClosedShapeBidiShape 这二个特例外, 都是单输入或单输出的。当系统提供的 Shape 不能满足要求时,Akka Streams 允许开发者定义多输入多输出的 Shape 来连接组件。例如一个二个输入口,三个输出口的形状就只能定制。Akka Streams 提供了简单的定制方法: 只需要继承抽象的 Shape class 并直截了当给出三个抽象成员的实现即可。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
case class Shape2By3(
    in0: Inlet[Int],
    in1: Inlet[Int],
    out0: Outlet[Int],
    out1: Outlet[Int],
    out2: Outlet[Int]
) extends Shape {

  override def inlets: Seq[Inlet[_]] = List(in0, in1)
  override def outlets: Seq[Outlet[_]] = List(out0, out1, out2)
  override def deepCopy(): Shape = Shape2By3(
    in0.carbonCopy(),
    in1.carbonCopy(),
    out0.carbonCopy(),
    out1.carbonCopy(),
    out2.carbonCopy()
  )
}

其使用方法和系统提供的 Shape 一样,用 Graph DSL 将其输入和输出端口通过其它构件连接起来。下面例子用一个二个输入口的 Merge 和一个三个输出口的 Balance 在内部连接,将二个输入流均衡的分配到三个输出口,对外则形成一个带有二个输入口和三个输出口的构件。最后再通过构建 RunnableGraph 来运行。 完整的代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import scala.util.Success
import akka.actor.ActorSystem
import akka.NotUsed
import akka.stream.{ClosedShape, Inlet, Outlet, Shape}
import akka.stream.scaladsl.{Balance, GraphDSL, Merge, RunnableGraph, Sink, Source, Zip}

// Step 1 定制形状
case class Shape2By3(
    in0: Inlet[String],
    in1: Inlet[String],
    out0: Outlet[String],
    out1: Outlet[String],
    out2: Outlet[String]
) extends Shape {

  override def inlets: Seq[Inlet[_]] = List(in0, in1)
  override def outlets: Seq[Outlet[_]] = List(out0, out1, out2)
  override def deepCopy(): Shape = Shape2By3(
    in0.carbonCopy(),
    in1.carbonCopy(),
    out0.carbonCopy(),
    out1.carbonCopy(),
    out2.carbonCopy()
  )
}

object Demo2By3 {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("testStreams")
    implicit val ec = scala.concurrent.ExecutionContext.global

    // Step 2 创建连接构件
    val graph2By3 = GraphDSL.create() {
      implicit builder: GraphDSL.Builder[NotUsed] =>
        import GraphDSL.Implicits._
        val merge = builder.add(Merge[String](2))
        val balance = builder.add(Balance[String](3))

        merge ~> balance

        Shape2By3(
          merge.in(0),
          merge.in(1),
          balance.out(0),
          balance.out(1),
          balance.out(2)
        )
    }

    // Step 3 基本组件
    import scala.concurrent.duration._
    val source1 = Source.repeat("Repeat").throttle(3, 1.second).take(7)
    val source2 = Source.tick(0.second, 1.second, "Tick").take(3)

    val sink1 = Sink.foreach[String](message => println(s"Sink 1: ${message}"))
    val sink2 = Sink.foreach[String](message => println(s"Sink 2: ${message}"))
    val sink3 = Sink.foreach[String](message => println(s"Sink 3: ${message}"))

    // Step 4 创建 RunnableGraph
    val graph = RunnableGraph.fromGraph(
      GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
        import GraphDSL.Implicits._

        val graph2By3Shape = builder.add(graph2By3)
        source1 ~> graph2By3Shape.in0
        source2 ~> graph2By3Shape.in1
        graph2By3Shape.out0 ~> sink1
        graph2By3Shape.out1 ~> sink2
        graph2By3Shape.out2 ~> sink3

        ClosedShape
      }
    ) // RunnableGrpah

    graph.run()
  }
}

/* 输出结果
Sink 1: Repeat
Sink 2: Tick
Sink 3: Repeat
Sink 1: Repeat
Sink 2: Tick
Sink 3: Repeat
Sink 1: Repeat
Sink 2: Repeat
Sink 3: Repeat
Sink 1: Tick
 */

3 生成实体化值

用于创建图的 GraphDSL.create 方法是一个重载方法,除了第一个签名(固定了实体化值为 NotUsed), 后续的签名在创建函数参数列表之前都有二个非空的参数列表:

1
2
3
4
5
6
7
def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed]

def create[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: GraphDSL.Builder[Mat] => (g1.Shape) => S): Graph[S, Mat]

def create[S <: Shape, Mat, M1, M2](g1: Graph[Shape, M1], g2: Graph[Shape, M2])(combineMat: (M1, M2) => Mat)(buildBlock: GraphDSL.Builder[Mat] => (g1.Shape, g2.Shape) => S): Graph[S, Mat]

// 直到 g1, g2, ..., g22

多出来的参数列表一个用于指定传入的一个或多个 Graph 参数,另一个指定相应的实体化值转换函数。下面是一个具体例子,关键的不同就是 GraphDSL.create(sink1, sink2)(saveMats){ implicit builder => (sum, count) =>...}。可以看到,Graph DSL 可以利用传入的图的实体化值生成新的实体化值。注意在创建时需要使用作为内部函数参数的 sumcount 变量,否则会因为端口没有连接产生编译错误。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import scala.concurrent.Future
import akka.actor.ActorSystem

import akka.stream.scaladsl.{Broadcast, GraphDSL, Keep, Sink, Source}
import akka.stream.SinkShape

object MatDemo {
  def main(args: Array[String]) {

    implicit val system = ActorSystem("testStreams")
    implicit val ec = scala.concurrent.ExecutionContext.global

    val sink1 = Sink.reduce[Int](_ + _)
    val sink2 = Sink.fold[Int, Int](0)((count, _) => count + 1)

    def saveMats(mat1: Future[Int], mat2: Future[Int]) =
      for {
        v1 <- mat1
        v2 <- mat2
      } yield (v1.toDouble / v2)

    val sink = Sink.fromGraph(GraphDSL.create(sink1, sink2)(saveMats) {
      implicit builder => (sum, count) =>
        import GraphDSL.Implicits._

        val broadcast = builder.add(Broadcast[Int](2))
        broadcast ~> sum
        broadcast ~> count
        SinkShape(broadcast.in)
    })

    val mats = Source(1 to 10).runWith(sink)

    mats.onComplete { value =>
      println(value)
      system.terminate()
    }
  }
}

// Output: Success(5.5)

4 把实体化值转为数据单元

Akka Streams 还有一个很有意思的功能:可以在创建 Graph 的时候把实体化值转换为输出的流数据单元。把上面例子稍做改变,就可以把实体化值转换为流数据,相关部分的代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
val flow = Flow.fromGraph(GraphDSL.create(sink1, sink2)(saveMats) {
  implicit builder => (sum, count) =>
    import GraphDSL.Implicits._

    val broadcast = builder.add(Broadcast[Int](2))
    broadcast ~> sum
    broadcast ~> count

    // 把实体化值生成一个输出口,多次调用会生成多个
    val materialOut: Outlet[Future[Double]] = builder.materializedValue
    FlowShape(broadcast.in, materialOut.outlet)
})

val mats = Source(1 to 10)
  .via(flow)
  .runForeach(_.onComplete(println))

mats.onComplete {
  case _ => system.terminate()
}

可以看到,val materialOut: Outlet[Future[Double]] = builder.materializedValue 每次调用会产生一个 Outlet,即一个输出端口,其值为当前 Graph 的实体化值 Future[Double]。 由于多出了一个输出端口,产生的图类型也变成了一个 Flow 类型。Graph 实体化文档 给出了更多说明和例子。 如文档所言,这个输出值是实体化的值,要避免内部引用形成循环。