Future 是 Scala 自带的异步并发机制。如果数据处理非常费时或是远程的,那么通过 Future 来运行这些操作是非常常见的做法。Akka Streams 提供了方便的 API 来执行这种异步操作。由于 Akka Streams 是基于 Akka Actor 的,另外一种常用的异步操作是和 Actor 的互操作,其机制与 Future 的互操作原理相近,不做单独介绍。

1 单独的线程池

对于异步操作,最好根据应用的特点使用单独定制的线程池。Daniel Spiewak 的文章 给出了很好的建议。

本文例子假设异步服务是阻塞式调用,首先配置一个适合阻塞服务调用的线程池。一个应用的所有阻塞式服务可以共享这个线程池。在 src/main/resources/appliation.conf 文件加入如下内容。

1
2
3
4
5
6
7
blocking-dispatcher {
  executor = "thread-pool-executor"
  thread-pool-executor {
    core-pool-size-min    = 2
    core-pool-size-max    = 10
  }
}

可以提供隐含参数 implicit val dispatcher = system.dispatchers.lookup("blocking-dispatcher") 来使用这个线程池,避免使用缺省的 Akka Actor 线程池。

2 mapAsync

Akka streams 为 SourceFlow 提供了 mapAsync 扩展方法用于执行异步操作。其第一个参数是并发的线程数,第二个参数为返回 Future[T] 的异步函数。当异步函数执行完成时,其返回值(类型为 T,而不是 Future[T])会作为流的数据单元发送到下游。下面是一个完整的例子。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import scala.concurrent.Future
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}

object FutureDemo {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("testStreams")
    implicit val dispatcher = system.dispatchers.lookup("blocking-dispatcher")

    object Processor {
      def process(number: Int) = Future {

        println(s"Process $number")
        Thread.sleep(500)
        number * 10
      }
    }

    val source = Source(1 to 7)
    val sink = Sink.foreach[Int](value => println(s"Sink: $value"))
    val result = source.mapAsync(3)(Processor.process).runWith(sink)
  }
}

/* 输出结果
Process 3
Process 1
Process 2
Sink: 10
Sink: 20
Process 4
Sink: 30
Process 5
Process 6
Sink: 40
Sink: 50
Sink: 60
Process 7
Sink: 70
 */

从输出结果可以看到,并发的异步执行阶段处理的次序可能不同,但是传到下游的数据单元和上游到达的次序一致。如果不需要保持原来的次序,通常可以加快数据处理速度。此时只需要把 mapAsync 换成 mapAsyncUnordered 即可。把上面代码改为使用 source.mapAsyncUnordered(3)(Processor.process).runWith(sink) 后,一种可能的输出结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
Process 1
Process 3
Process 2
Sink: 10
Process 4
Sink: 30
Process 5
Sink: 20
Process 6
Sink: 50
Sink: 40
Process 7
Sink: 60
Sink: 70

3 和 mapasync 的区别

SourceFlowmap 操作通常是 Fusion 聚合后同步执行的。可是通过属性可以指定异步执行的线程池。比如这个例子:Source(1 to 7).map(_ + 1).withAttributes(ActorAttributes.dispatcher("blocking-dispatcher")).runWith(Sink.ignore) 中的操作是在指定的线程池中异步执行的。

这种异步和 mapAsync 的区别在于参数和并发度。 mapAsync 支持多个并发执行和返回 Future[T] 的异步函数。而 map 每次只处理一个数据单元并且对传入的函数没有太多限制。

async 和上面的 map 一样,在缺省的 Akka Streams 全局线程池异步执行,每次只处理一个数据单元,不支持并发。async 可以作为任何一个 Graph 的操作创建一个有缓存和回压的异步执行的新边界。