Flow DSL 和 Graph DSL 的目的是灵活的连接各种组件,主要是通过 Shape 来操作的图的连接拓扑结构。如果需要实现类似 Merge, Zip 这种内置了数据处理功能的构件或操作符,需要定制 GraphStage (翻译成“图步”)来完成。GraphStageGraph 的子类,在其形状和属性基础上定义了入口和出口的数据处理逻辑。从下面的例子可以看到,Akka Streams 的基本处理逻辑是 pull-based back pressure 拉驱动的回压机制。通过实现定制 GraphStage 有助于深刻理解 Akka Streams 的实现机制。本文用由简到繁的几个例子加以演示和说明,总结放在下一章。

所有的例子和图形都基于或来自 Custom stream processing 官方文档,下面不另做说明。

1 一个定制的数据源点

创建一个 GraphStage 类需要给出 Shape 的定义和处理逻辑。Shape 用于指定用到的入口 (Inlet) 和出口 (Outlet)。一个数据源没有入口,通常也只有一个出口。利用已有的 SourceShape,可以定义如下:

1
2
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)

数据处理逻辑需要重载 createLogic() 方法来创建一个新的 GraphStageLogic 对象。这是非常关键的一步,因为每次实体化的时候都会调用这个方法来产生新的GraphStageLogic 对象,所有的相关数据和处理逻辑都需要封装在这个对象里面。对于数据源,只需要指定出口的 onPull callback 回调函数即可。完整的演示代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import akka.actor.ActorSystem
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}

class NumbersSource extends GraphStage[SourceShape[Int]] {
  val out: Outlet[Int] = Outlet("NumbersSource")
  override val shape: SourceShape[Int] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      private var counter = 1
      val outHandler = new OutHandler {
        override def onPull(): Unit = {
          push(out, counter)
          counter += 1
        }
      }
      setHandler(out, outHandler)
    }
}

object Test {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("testStreams")
    implicit val ec = scala.concurrent.ExecutionContext.global

    val source = Source.fromGraph(new NumbersSource)
    source.take(3).runForeach(println)

  }
}
/* output
1
2
3
 */

本例子中在 new GraphStageLogic(shape) {...} 的表达式中定义了一个计数器,然后在 OutHandler 里面的 onPull() 方法里调用 push 方法数据推出,然后计数器加一。下游的 pull 请求会触发上游的 push 方法调用来发送数据。这里可以看到回压机制的实现。因为 GraphStage 是 Graph 的子类,没有运行时需要的 Attribute 属性配置,所以用 Sink.fromGraph 方法来创建一个新的数据源点。

2 定制数据终点

终点的 Shape 没有出口,只有入口。其处理逻辑在于重载 onPush 方法,用 grab(in) 方法从入口取得数据后,需要调用 pull(in) 请求后续数据。需要注意的是,Akka Streams 的流是 pull-based,所以数据终点需要在开始时先 pull(in) 来启动整个数据流。 一个简单的打印收到数据的定制终点程序如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import akka.actor.ActorSystem
import akka.stream.{Attributes, Inlet, SinkShape}
import akka.stream.scaladsl.{Source, Sink}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}

class PrintSink extends GraphStage[SinkShape[Int]] {
  val in: Inlet[Int] = Inlet("PrintSink")
  override val shape: SinkShape[Int] = SinkShape(in)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {

      // 启动时发出初始请求
      override def preStart(): Unit = pull(in)

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          println(grab(in))
          pull(in)
        }
      })
    }
}

object SinkTest {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("testStreams")
    implicit val ec = scala.concurrent.ExecutionContext.global

    val sink = Sink.fromGraph(new PrintSink)
    Source(1 to 3).runWith(sink)
  }
}

/* output
1
2
3
 */

创建的 GraphStage 是 Graph 的子类,没有运行时需要的 Attribute 属性配置,所以用 Sink.fromGraph 方法来创建一个新的数据终点。

3 一个定制的 Duplicator Flow

一个 Flow 具有一个入口和一个出口。内置的处理逻辑需要接受入口数据和产生出口数据,同时可能还要处理上下游完成或出错的情况。比如一个定制的 Duplicator Flow,复制上游的每个数据。当上游完成时,也需要尝试把缓存的数据发送给下游。下面的例子很好的解释来入口和出口的处理逻辑及其交互。类似的,创建的 GraphStage 是 Graph 的子类,没有运行时需要的 Attribute 属性配置,所以用 Flow.fromGraph 方法来创建一个新 Flow。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import akka.actor.ActorSystem

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.scaladsl.{Flow, Source}

class Duplicator[A] extends GraphStage[FlowShape[A, A]] {

  val in = Inlet[A]("Duplicator.in")
  val out = Outlet[A]("Duplicator.out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      // 可变状态必须定义在 GraphStageLogic 里面
      var lastElem: Option[A] = None

      val inHandler = new InHandler {
        override def onPush(): Unit = {
          val elem = grab(in)
          lastElem = Some(elem)
          push(out, elem)
        }

        // 上游结束时尝试发送缓存的数据
        override def onUpstreamFinish(): Unit = {
          if (lastElem.isDefined) emit(out, lastElem.get)
          complete(out)
        }

      }
      setHandler(in, inHandler)

      val outHanlder = new OutHandler {
        override def onPull(): Unit = {
          if (lastElem.isDefined) {
            push(out, lastElem.get)
            lastElem = None
          } else {
            pull(in)
          }
        }
      }
      setHandler(out, outHanlder)
    }
}

object DuplicatorTest {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("testStreams")
    implicit val ec = scala.concurrent.ExecutionContext.global

    val source = Source(1 to 3)
    val flow = Flow.fromGraph(new Duplicator[Int])
    source.via(flow).runForeach(println)
  }
}

/* output
1
1
2
2
3
3
 */

4 返回实体化值

当需要返回一个实体化值时,需要重载 GraphStageWithMaterializedValue 类的 createLogicAndMaterializedValue 方法。改方法返回一个包含二个元素的 tuple:(GraphStageLogic, Future[A])。下面的例子把流中的第一个数据单元作为实体化值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import scala.concurrent.{Future, Promise}
import akka.actor.ActorSystem

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{
  GraphStageLogic,
  GraphStageWithMaterializedValue,
  InHandler,
  OutHandler
}
import akka.stream.scaladsl.{Flow, Keep, Source, Sink}

class FirstValue[A]
    extends GraphStageWithMaterializedValue[FlowShape[A, A], Future[A]] {

  val in = Inlet[A]("FirstValue.in")
  val out = Outlet[A]("FirstValue.out")
  val shape = FlowShape.of(in, out)

  override def createLogicAndMaterializedValue(
      inheritedAttributes: Attributes
  ): (GraphStageLogic, Future[A]) = {
    val promise = Promise[A]()
    val logic = new GraphStageLogic(shape) {

      val inHandler = new InHandler {
        override def onPush(): Unit = {
          val elem = grab(in)
          promise.success(elem)
          push(out, elem)

          // 第一次之后就直接转发数据
          setHandler(in, new InHandler {
            override def onPush(): Unit = {
              push(out, grab(in))
            }
          })
        }
      }
      setHandler(in, inHandler)

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })

    }

    (logic, promise.future)
  }
}

object MatGraphTest {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("testStreams")
    implicit val ec = scala.concurrent.ExecutionContext.global

    val source = Source(1 to 3)
    val flow = Flow.fromGraph(new FirstValue[Int])
    val result =
      source.viaMat(flow)(Keep.right).toMat(Sink.ignore)(Keep.left).run()

    result.onComplete(println)
  }
}

/* output
Success(1)
 */

本例子中在实体化时可以很早就拿到这个数据。如果取最后一个数据单元作为实体化值,则只有在整个流结束后,Future 才会有结果。所以实体化值是相对独立的异步运算结果,可以是和流数据相关或无关的任何数据类型。