Akka Streams 的实体化值可以是任何数据类型,理解了这一点就比较容易理解动态的流处理:就是用实体化值对流进行一些有意思的操作比如动态的创建 Source 或 Sink。此处的动态是指创建的流在实体化运行后,可以接入多个不同的输入或输出。

1 MergeHub

MergeHub 是一个动态的多入口构件。其作用相当于一个多入口的 Sink,多个数据源可以把数据发送给 MergeHub 并共享相同的下游数据处理逻辑。下面是个基于 MergeHub 官方文档 的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import akka.actor.ActorSystem
import akka.NotUsed
import akka.stream.scaladsl.{MergeHub, RunnableGraph, Source, Sink}

object DynamicTest {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("testStreams")
    implicit val ec = scala.concurrent.ExecutionContext.global

    val consumer = Sink.foreach(println)

    val runnableGraph: RunnableGraph[Sink[String, NotUsed]] =
      MergeHub.source[String].to(consumer)

    val toConsumer: Sink[String, NotUsed] = runnableGraph.run()

    Source.single("Hello!").runWith(toConsumer)
    Source.single("Hub!").runWith(toConsumer)
  }
}

从类型签名可以看出,把 MergeHub 当成 Source 创建 RunnableGraph 并运行后,其实体化值的类型是一个 Sink 对象。此时可以把这个实体化值当成一个 Sink 来创建新的 RunnableGraph,所有这些新的 RunnableGraph 会根据保持到达次序传给原来最初的下游数据处理组件。

2 BroadcastHub

BroadcastHub 是一个动态的多出口构件。其作用相当于一个多出口的 Source,可以把一个数据源发送给多个下游处理组件。例子如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import akka.actor.ActorSystem
import akka.NotUsed
import akka.stream.scaladsl.{BroadcastHub, Keep, RunnableGraph, Source}

object DynamicTest {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("testStreams")
    implicit val ec = scala.concurrent.ExecutionContext.global

    import scala.concurrent.duration._
    val producer = Source.tick(1.second, 1.second, "New message")

    val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
      producer.toMat(BroadcastHub.sink)(Keep.right)

    val fromProducer: Source[String, NotUsed] = runnableGraph.run()

    fromProducer.runForeach(msg => println("consumer1: " + msg))
    fromProducer.runForeach(msg => println("consumer2: " + msg))
  }
}

发布-注册服务

当把 MergeHubBroadcastHub 组合起来是会形成一个多入口多出口的发布-注册服务组件。其作用类似一个广播消息总线。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
mport akka.actor.ActorSystem
import akka.NotUsed
import akka.stream.scaladsl.{BroadcastHub, Keep, MergeHub, RunnableGraph, Sink, Source}

object DynamicTest {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("testStreams")
    implicit val ec = scala.concurrent.ExecutionContext.global

    val (sinkHub, sourceHub) =
      MergeHub
        .source[String]
        .toMat(BroadcastHub.sink)(Keep.both)
        .run()

    // 多个数据发布者
    val source1 = Source.single("Source1")
    val source2 = Source.single("Source2")
    source1.runWith(sinkHub)
    source2.runWith(sinkHub)

    // 多个数据注册者
    val sink1 = Sink.foreach[String](data => println(s"Sink1: $data"))
    val sink2 = Sink.foreach[String](data => println(s"Sink2: $data"))
    sourceHub.runWith(sink1)
    sourceHub.runWith(sink2)
  }
}