kill switch
akka-stream是多线程non-blocking模式的,一般来说,运算任务提交到另外线程后这个线程就会在当前程序控制之外自由运行了。任何时候如果需要终止运行中的数据流就必须采用一种任务柄(handler)方式来控制在其它线程内运行的任务。这个handler可以在提交运算任务时获取。akka-stream提供了KillSwitch trait来支持这项功能:
/**
* A [[KillSwitch]] allows completion of [[Graph]]s from the outside by completing [[Graph]]s of [[FlowShape]] linked
* to the switch. Depending on whether the [[KillSwitch]] is a [[uniqueKillSwitch]] or a [[SharedKillSwitch]] one or
* multiple streams might be linked with the switch. For details see the documentation of the concrete subclasses of
* this interface.
*/
//#kill-switch
trait KillSwitch {
/**
* After calling [[KillSwitch#shutdown()]] the linked [[Graph]]s of [[FlowShape]] are completed normally.
*/
def shutdown(): Unit
/**
* After calling [[KillSwitch#abort()]] the linked [[Graph]]s of [[FlowShape]] are failed.
*/
def abort(ex: throwable): Unit
}
//#kill-switch
可以想象:我们必须把这个KillSwitch放在一个流图中间,所以它是一种FlowShape的,这可以从KillSwitch的构建器代码里可以看得到:
object KillSwitches {
/**
* Creates a new [[SharedKillSwitch]] with the given name that can be used to control the completion of multiple
* streams from the outside simultaneously.
*
* @see SharedKillSwitch
*/
def shared(name: String): SharedKillSwitch = new SharedKillSwitch(name)
/**
* Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion
* of that unique materialization. Different materializations result in different, independent switches.
*
* For a Bidi version see [[KillSwitch#singleBidi]]
*/
def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch] =
UniqueKillSwitchStage.asinstanceOf[Graph[FlowShape[T, T], UniqueKillSwitch]]
/**
* Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion
* of that unique materialization. Different materializations result in different, independent switches.
*
* For a Flow version see [[KillSwitch#single]]
*/
def singleBidi[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch] =
UniqueBidiKillSwitchStage.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch]]
...}
stream提供了single,shared,singleBidi三种KillSwitch的构建方式,它们的形状都是FlowShape。KillSwitches.single返回结果类型是Graph[FlowShape[T,T],UniqueKillSwitch]。因为我们需要获取这个KillSwitch的控制柄,所以要用viaMat来可运算化(materialize)这个Graph,然后后选择右边的类型UniqueKillSwitch。这个类型可以控制一个可运算化FlowShape的Graph,如下:
val source = Source(Stream.from(1,2)).delay(1.second,DelayOverflowStrategy.backpressure)
val sink = Sink.foreach(println)
val killSwitch = source.viaMat(KillSwitches.single)(Keep.right).to(sink).run()
scala.io.StdIn.readLine()
killSwitch.shutdown()
println("terminated!")
actorSys.terminate()
当然,也可以用异常方式中断运行:
killSwitch.abort(new runtimeexception("boom!"))
source是一个不停顿每秒发出一个数字的数据源。如上所述:必须把KillSwitch放在source和sink中间形成数据流完整链状。运算这个数据流时返回了handle killSwitch,我们可以使用这个killSwitch来shutdown或abort数据流运算。
KillSwitches.shared构建了一个SharedKillSwitch类型。这个类型可以被用来控制多个FlowShape Graph的终止运算。SharedKillSwitch类型里的flow方法可以返回终止运算的控制柄handler:
/**
* Returns a typed Flow of a requested type that will be linked to this [[SharedKillSwitch]] instance. By invoking
* [[SharedKillSwitch#shutdown()]] or [[SharedKillSwitch#abort()]] all running instances of all provided [[Graph]]s by this
* switch will be stopped normally or failed.
*
* @tparam T Type of the elements the Flow will forward
* @return A reusable [[Graph]] that is linked with the switch. The materialized value provided is this switch itself.
*/
def flow[T]: Graph[FlowShape[T, T], SharedKillSwitch] = _flow.asInstanceOf[Graph[FlowShape[T, T], SharedKillSwitch]]
用flow构建的SharedKillSwitch实例就像immutable对象,我们可以在多个数据流中插入SharedKillSwitch,然后用这一个共享的handler去终止使用了这个SharedKillSwitch的数据流运算。下面是SharedKillSwitch的使用示范:
val sharedKillSwitch = KillSwitches.shared("multi-ks")
val source2 = Source(Stream.from(1)).delay(2.second,DelayOverflowStrategy.backpressure)
source2.via(sharedKillSwitch.flow).to(sink).run()
source.via(sharedKillSwitch.flow).to(sink).run()
scala.io.StdIn.readLine()
killSwitch.shutdown()
sharedKillSwitch.shutdown()
注意:我们先构建了一个SharedKillSwitch实例,然后在source2,source数据通道中间加入了这个实例。因为我们已经获取了sharedKillSwitch控制柄,所以不必理会返回结果,直接用via和to来连接上下游节点(默认为Keep.left)。
还有一个KillSwitches.singleBidi类型,这种KillSwitch是用来终止双流向数据流运算的。我们将在下篇讨论里介绍。
下面是本次示范的源代码:
import akka.stream.scaladsl._
import akka.stream._
import akka.actor._
import scala.concurrent.duration._
object KillSwitchDemo extends APP {
implicit val actorSys = ActorSystem("sys")
implicit val ec = actorSys.dispatcher
implicit val mat = ActorMaterializer(
ActorMaterializersettings(actorSys)
.withinputBuffer(16,16)
)
val source = Source(Stream.from(1,2)).delay(1.second,DelayOverflowStrategy.backpressure)
val sink = Sink.foreach(println)
val killSwitch = source.viaMat(KillSwitches.single)(Keep.right).to(sink).run()
val sharedKillSwitch = KillSwitches.shared("multi-ks")
val source2 = Source(Stream.from(1)).delay(2.second,DelayOverflowStrategy.backpressure)
source2.via(sharedKillSwitch.flow).to(sink).run()
source.via(sharedKillSwitch.flow).to(sink).run()
scala.io.StdIn.readLine()
killSwitch.shutdown()
sharedKillSwitch.shutdown()
println("terminated!")
actorSys.terminate()
}
相关阅读
C/C++ 学习笔记:istringstream、ostringstream、string
0、C++的输入输出分为三种:(1)基于控制台的I/O(2)基于文件的I/O(3)基于字符串的I/O 1、头文件[cpp] view plaincopyprint? #incl
Stream Processing: Apache Kafka的Exactly-once的定
2018年,Apache Kafka以一种特殊的设计和方法实现了强语义的exactly-once和事务性。热泪盈眶啊!这篇文章将讲解kafka中exactly-once
使用StreamReader和StreamWriter读取和写入文本文件
1.读取文本文件using System; using System.Collections.Generic; using System.Text; using System.Linq; using System.Collect
java基础io流——OutputStream和InputStream的故事(温
io流概述: IO流用来处理设备之间的数据传输,上传文件和下载文件,Java对数据的操作是通过流的方式,Java用于操作流的对象都在IO包中。
c#语言------------FileStream类的基本使用
FileStream类不是静态类,使用时需要创建对象,FileStream类既可以对文本文件进行读也可以对多媒体文件进行写,以字节数组的形式进行读