Backpressure(回压)是 Akka Streams 最为核心的功能,是其区别于其它系统的标志性特征。回压主要用于处理数据生产者和消费者的速度不匹配情况,通过不同的数据溢出处理策略来提高系统的稳定性。本文首先介绍 async boundary(异步边界)的概念,然后用代码演示了回压的功能和溢出策略。

1 Async Boundar

Akka Streams 的一个流处理中的 Operators 可以在运行在不同的线程甚至不同的 JVM 里面,一个线程就是一个 async boundary(异步边界)。从 Akka Streams 2.5 开始,当组合 operators 时,如果不调用 async() 方法,这些 operators 会运行在同一个 async boundary,也就是同一个线程里面。这种优化 Akka Streams 称其为 Fusion (聚合)。Fusion 模式时上下游 Operators 之间直接用共享内存访问 elements,每一个 element 处理完成后再处理下一个 element,不要流量控制机制。比如下面的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import java.time.format.DateTimeFormatter
import java.time.LocalTime

object TestStreams extends App {
  implicit val system = ActorSystem("testStreams")
  implicit val ec = scala.concurrent.ExecutionContext.global
  val dataFormat = DateTimeFormatter.ofPattern("hh:mm:ss:SSS")

  val source = Source(1 to 3)

  private def getTime() = {
    LocalTime.now().format(dataFormat)
  }

  private def flowFactory(name: String) = {
    Flow[Int].map(element => {
      Thread.sleep(1000)
      val threadName = Thread.currentThread().getName()
      val now = getTime()
      println(s"Flow-${name}: ${threadName} [${element}] ${now}")
      element
    })
  }

  val flowA = flowFactory("A")
  val flowB = flowFactory("B")
  val flowC = flowFactory("C")

  println(s"Start at ${getTime()}")
  val result = source
    .via(flowA)
    .via(flowB)
    .via(flowC)
    .runWith(Sink.ignore)

  result.onComplete(_ => {
    println(s"End at ${getTime()}")
    system.terminate()
  })
}

其输出为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Start at 07:20:47:823
Flow-A: testStreams-akka.actor.default-dispatcher-7 [1] 07:20:48:847
Flow-B: testStreams-akka.actor.default-dispatcher-7 [1] 07:20:49:852
Flow-C: testStreams-akka.actor.default-dispatcher-7 [1] 07:20:50:855
Flow-A: testStreams-akka.actor.default-dispatcher-7 [2] 07:20:51:858
Flow-B: testStreams-akka.actor.default-dispatcher-7 [2] 07:20:52:862
Flow-C: testStreams-akka.actor.default-dispatcher-7 [2] 07:20:53:862
Flow-A: testStreams-akka.actor.default-dispatcher-7 [3] 07:20:54:864
Flow-B: testStreams-akka.actor.default-dispatcher-7 [3] 07:20:55:865
Flow-C: testStreams-akka.actor.default-dispatcher-7 [3] 07:20:56:869
End at 07:20:56:871

上例中用 Thread.sleep 来模拟长时间的处理,生产代码中应该尽可能避免这种 blocking call 阻塞调用。可以看到所有 Operators 运行在同一个线程,处理完一个 element 再处理下一个,没有流量控制的必要。对于程序中模拟的长时间处理,由于只用到一个线程,一个接一个串行处理每个元素。处理一个需要 3 秒,总耗时 9 秒。如果各个 Operator 运行在不同的异步边界,则各个 Operator 异步并发在不同的线程执行。创建异步边界非常简单,只需要在 Operator 后面调用 async 即可。如下面程序所示:

1
2
3
4
5
6
7
8
val result = source
  .via(flowA)
  .async
  .via(flowB)
  .async
  .via(flowC)
  .async
  .runWith(Sink.ignore)

此时的运行结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Start at 07:24:42:062
Flow-A: testStreams-akka.actor.default-dispatcher-9 [1] 07:24:43:091
Flow-A: testStreams-akka.actor.default-dispatcher-9 [2] 07:24:44:097
Flow-B: testStreams-akka.actor.default-dispatcher-7 [1] 07:24:44:097
Flow-B: testStreams-akka.actor.default-dispatcher-7 [2] 07:24:45:101
Flow-C: testStreams-akka.actor.default-dispatcher-8 [1] 07:24:45:101
Flow-A: testStreams-akka.actor.default-dispatcher-9 [3] 07:24:45:101
Flow-C: testStreams-akka.actor.default-dispatcher-8 [2] 07:24:46:101
Flow-B: testStreams-akka.actor.default-dispatcher-6 [3] 07:24:46:102
Flow-C: testStreams-akka.actor.default-dispatcher-7 [3] 07:24:47:105
End at 07:24:47:106

每一个 async 调用创建一个新的异步边界,底层的实现是运行在不同的 Akka Actor 上。可以看到每个 Operator 运行在不同的线程并发处理收到的数据。总耗时从大约 9 秒降低到大约 5 秒。值得说明的是,如同其它的多线程编程,需要仔细衡量异步边界切换的开销和任务的性质决定是否采用异步并发。比如上面例子中,当去掉 Thread.sleep 和打印,对一百万 elements,在我的 Macbook Pro 笔记本电脑上,Fusion 单线程费时 0.22 秒而多线程版本费时约 2 秒。

重要的是,对 Akka Streams 而言,当不同 Operators 在不同的异步边界时,Operator 之间的连接会引入 buffer 和 backpressure。数据单元的相对顺序保持不变。

2 Buffer 和 Backpressure

Akka Streams 的回压机制是 pull-based(拉驱动),每次数据发送都由下游消费者发起请求,由下游的数据消费者指定上游的数据发送者每次可以发送的数据量。如上节所述,这种回压机制只存在于不同的异步边界上,通过缓存和异步控制消息实现。Akka Streams 中每个异步边界运行在不同的 Akka Actor 上。二个异步边界的连接在下游一侧有一个缺省值为 16 的 buffer。这个值是 element 的个数,大的 element 数据类型会占用更多内存。可以通过参数设置改变这个值 akka.stream.materializer.max-input-buffer-size = 16

在开始运行时,上游的 Operator 会等待下游的 Operator 给出明确的 Pull 特定数目的 element 请求时才 Push 推送相应数目的 elements。初始时下游会请求其缓存大小的 elements,只有当缓存有空余时(缺省是有一半空了)才会向上游发出新的请求。这种回压控制是通过异步消息方式进行的,和正常的数据流向相反,这是 backpressure(回压)back(回)字的由来。回压控制是端到端的,从最后的数据终点(Sink)出发,从下游到上游,依次决定各个中间处理环节直到数据源点(Source)的数据发送量。所以整体的数据发送速率是整个流中最慢节点的数据请求速率。这种回压控制是底层机制,对应用程序员是透明的。唯一的例外就是当慢的消费者面对快的数据生产者而无法控制数据发送速率时,需要指明对来不及处理的数据单元的处理策略,Akka Streams 对此提供了丰富的处理策略。

为了方便演示,我们把 Buffer Size 值调整为 4。在 src/main/resources/application.conf 里设置:akka.stream.materializer.max-input-buffer-size = 4。下面的程序演示了在二个异步边界的回压机制。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import java.time.format.DateTimeFormatter
import java.time.LocalTime

object TestStreams {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("testStreams")
    implicit val ec = scala.concurrent.ExecutionContext.global
    val dataFormat = DateTimeFormatter.ofPattern("hh:mm:ss:SSS")

    def getTime() = {
      LocalTime.now().format(dataFormat)
    }

    val source = Source(1 to 10)
    val flow = Flow[Int].map { element =>
      println(s"Flow: [${element}] ${getTime()}")
      element
    }
    val sink = Sink.foreach[Int](element => {
      Thread.sleep(1000)
      println(s"Sink: [${element}] ${getTime()}")
    })

    println(s"Start at ${getTime()}")
    val result = source.async.via(flow).async.runWith(sink)

    result.onComplete(_ => {
      println(s"End at ${getTime()}")
      system.terminate()
    })
  }
}

上面程序的输出如下,为了方便分析,人工加入了空行分隔输出内容:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Start at 02:22:51:908

Flow: [1] 02:22:51:937
Flow: [2] 02:22:51:938
Flow: [3] 02:22:51:938
Flow: [4] 02:22:51:938

Sink: [1] 02:22:52:941

Flow: [5] 02:22:52:942
Flow: [6] 02:22:52:942

Sink: [2] 02:22:53:944
Sink: [3] 02:22:54:950

Flow: [7] 02:22:54:951
Flow: [8] 02:22:54:951

Sink: [4] 02:22:55:951
Sink: [5] 02:22:56:957

Flow: [9] 02:22:56:957
Flow: [10] 02:22:56:958

Sink: [6] 02:22:57:958
Sink: [7] 02:22:58:960
Sink: [8] 02:22:59:966
Sink: [9] 02:23:00:969
Sink: [10] 02:23:01:972

End at 02:23:01:975

数据的流动始于 sink 在实体化时开始请求数据。可以看到,除了最初时一次收到四个单元 (max-input-buffer-size = 4),flow 随后每次只收到 Buffer Size 的一半,即二个数据单元,而且都是在 sink 处理了二个数据单元之后。一个例外是因为有数据缓存,sink 第一次只处理了一个数据单元后,flow 就接到通知并开始请求二个数据单元。此后都是sink 处理了二个,再发通知给 flow 请求二个数据单元,flow 随后再发通知给 source 请求二个数据单元。

3 溢出策略

回压只在快的生产者和慢的消费者之间起作用。在慢的生产者和快的消费者之间则无需担心。其实对于快的生产者和慢的消费者引起的数据溢出问题,回压只是其中的一种策略,是 Akka Streams 的所有处理单元的一种缺省的溢出处理策略。

Akka Streams 的每一个异步边界都运行在一个单独的 Akka Actor 里面。其 inlet(入口)都有一个 InputBuffer 输入缓存。akka.stream.materializer.init-input-buffer-sizeakka.stream.materializer.max-input-buffer-size 这二个参数指定了初始的缓存大小和最大的缓存大小,尺寸是数据单元的个数,其缺省值分别为 4 和 16。每一个 Runnable Graph 或处理单元也可以分别指定缓存的尺寸以及其溢出策略。

比如上面的代码中,如果如下设定 flow 的缓存和溢出策略,其它代码不变,则产生不同的输出。

1
2
3
4
val result = source.async
  .via(flow.buffer(2, OverflowStrategy.dropHead))
  .async
  .runWith(sink)

其输出如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
Start at 11:04:43:518
Flow: [1] 11:04:43:553
Flow: [2] 11:04:43:553
Flow: [3] 11:04:43:553
Flow: [4] 11:04:43:554
Flow: [5] 11:04:43:554
Flow: [6] 11:04:43:554
Flow: [7] 11:04:43:555
Flow: [8] 11:04:43:555
Flow: [9] 11:04:43:555
Flow: [10] 11:04:43:556

Sink: [1] 11:04:44:556
Sink: [2] 11:04:45:557
Sink: [3] 11:04:46:562
Sink: [4] 11:04:47:567

Sink: [9] 11:04:48:568
Sink: [10] 11:04:49:573
End at 11:04:49:576

flow 不再使用回压策略,而采用了 dropHead,即丢弃旧数据的策略。其缓存尺寸为 2。最开始的 4 个数据都基于 sink(其缓存尺寸保持系统设定的 4 个数据单元)的第一个请求全部发送出去了。随后的数据,除了最新的 2 个,其它全部丢弃了。

下面是 Akka Streams 支持的溢出策略

  • OverflowStrategy.backpressure:回压。这是缺省的溢出策略。当缓存满了则不从上游 pull(拉)数据,或者说不请求新数据。当缓存有一半为空时,再从上游请求新数据,请求个数为缓存尺寸的一半。
  • OverflowStrategy.dropBuffer:丢弃缓存。当缓存满了,有新的数据单元,则丢弃整个缓存数据。
  • OverflowStrategy.dropHead:丢弃缓存旧数据。当缓存满了,有新的数据单元,则丢弃最老的数据单元。
  • OverflowStrategy.dropNew:丢弃新数据单元。
  • OverflowStrategy.dropTail: 丢弃缓存新数据。当缓存满了,有新的数据单元,则丢弃缓存中最新的数据单元。
  • OverflowStrategy.fail:失败。当缓存满了,有新的数据单元时,completes the streams(结束流)并返回一个错误。

小结

除了包含了 backpressure(回压)的 overflow strategy(溢出策略),Akka Streams 还提供了丰富的流量控制功能,包括 rate limit(限速),aggretation(聚合),throttle(节流阀),以及 extrapolate(插值)等。这些额外的流量控制功能在 SourceFlow 的 API 文档都有详细的说明和例子。