barrier
文章目录
- Chandy-Lamport算法
- Barrier
- Barrier触发逻辑
Chandy-Lamport算法
flink的checkpoint是基于Chandy-Lamport算法的改进版来实现的,通过固定间隔时间往source发送barrier,每个operator收到barrier时进行快照,如果有多个input时需要相同barrierId的barrier都到达才进行快照。这样就能实现At least once或exactly once语义。
参考分布式快照算法: Chandy-Lamport
Barrier
Barrier是Flink分布式Snapshotting中的核心元素,它会作为数据流的记录被同等看待,被插入到数据流中,将数据流中记录的进行分组,并沿着数据流的方向向前推进。每个Barrier会携带一个Snapshot ID,属于该Snapshot的记录会被推向该Barrier的前方。因为Barrier非常轻量,所以并不会中断数据流。带有Barrier的数据流,如下图所示:
基于上图,我们通过如下要点来说明:出现一个Barrier,在该Barrier之前出现的记录都属于该Barrier对应的Snapshot,在该Barrier之后出现的记录属于下一个Snapshot
来自不同Snapshot多个Barrier可能同时出现在数据流中,也就是说同一个时刻可能并发生成多个Snapshot
当一个中间(Intermediate)Operator接收到一个Barrier后,它会发送Barrier到属于该Barrier的Snapshot的数据流中,等到Sink Operator接收到该Barrier后会向Checkpoint Coordinator确认该Snapshot,直到所有的Sink Operator都确认了该Snapshot,才被认为完成了该Snapshot
这里还需要强调的是,Snapshot并不仅仅是对数据流做了一个状态的Checkpoint,它也包含了一个Operator内部所持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。也就是说,如果一个Operator包含任何形式的状态,这种状态必须是Snapshot的一部分。
Operator的状态包含两种:一种是系统状态,一个Operator进行计算处理的时候需要对数据进行缓冲,所以数据缓冲区的状态是与Operator相关联的,以窗口操作的缓冲区为例,Flink系统会收集或聚合记录数据并放到缓冲区中,直到该缓冲区中的数据被处理完成;另一种是用户自定义状态(状态可以通过转换函数进行创建和修改),它可以是函数中的java对象这样的简单变量,也可以是与函数相关的Key/Value状态。
对于具有轻微状态的Streaming应用,会生成非常轻量的Snapshot而且非常频繁,但并不会影响数据流处理性能。Streaming应用的状态会被存储到一个可配置的存储系统中,例如HDFS。在一个Checkpoint执行过程中,存储的状态信息及其交互过程,如下图所示:
在Checkpoint过程中,还有一个比较重要的操作——Stream Aligning。当Operator接收到多个输入的数据流时,需要在Snapshot Barrier中对数据流进行排列对齐,如下图所示:
具体排列过程如下:
Operator从一个incoming Stream接收到Snapshot Barrier n,然后暂停处理,直到其它的incoming Stream的Barrier n(否则属于2个Snapshot的记录就混在一起了)到达该Operator
接收到Barrier n的Stream被临时搁置,来自这些Stream的记录不会被处理,而是被放在一个Buffer中
一旦最后一个Stream接收到Barrier n,Operator会emit所有暂存在Buffer中的记录,然后向Checkpoint Coordinator发送Snapshot n
继续处理来自多个Stream的记录
基于Stream Aligning操作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐Barrier的一个Stream为处理Buffer中缓存记录的时刻点。在Flink中,提供了一个开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。
Barrier触发逻辑
Barrier的接口类是CheckpointBarrierhandler,子接口为SelectedReadingBarrierHandler,子类包括
BarrierBuffer和BarrierTracker
其中BarrierBuffer可以保证Exactly-Once语义,以barrier阻塞输入直到所有的输入都接收到基于某个检查点的barrier,也就是上面所说的对齐。为了避免背压输入流,BarrierBuffer将从被阻塞的channel中持续地接收buffer并在内部存储它们,直到阻塞被解除。
BarrierTracker只能保证At-Least-Once语义,BarrierTracker会对各个输入接收到的检查点的barrier进行跟踪。一旦它观察到某个检查点的所有barrier都已经到达,它将会通知监听器检查点已完成,以触发相应地回调处理。
1.生成CheckpointBarrier对象,向streamOutput写入
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
}
2.下游的operator接收到本barrier,就会触发其自身的checkpoint,捕获barrier的过程其实就是处理input数据的过程,对应着StreamInputProcessor.processInput()方法。处理barrier的逻辑体现在getNextNonBlocked,将在下面的代码中讲到
public boolean processInput() throws Exception {
//省略
//每个元素都会触发这一段逻辑,如果下一个数据是buffer,则从外围的while循环里进入处理用户数据的逻辑;这个方法用来处理barrier的逻辑
//getNextNonBlocked方法将在下面讲到
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer()) {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("unexpected event: " + event);
}
}
}
else {
isFinished = true;
if (!barrierHandler.isempty()) {
throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
}
return false;
}
}
}
//BarrierBuffer
public BufferOrEvent getNextNonBlocked() throws Exception {
return getNext(true);
}
BufferOrEvent getNext(boolean blocking) throws Exception {
while (true) {
optional<BufferOrEvent> next;
if (currentBuffered == null) {
next = blocking ? inputGate.getNextBufferOrEvent() : inputGate.pollNextBufferOrEvent();
}
else {
next = Optional.ofNullable(currentBuffered.getNext());
if (!next.isPresent()) {
completeBufferedsequence();
return getNext(blocking);
}
}
if (!next.isPresent()) {
if (inputGate.isFinished() && !endOfStream) {
endOfStream = true;
releaseBlocksAndResetBarriers();
return getNext(blocking);
}
else {
return null;
}
}
BufferOrEvent bufferOrEvent = next.get();
if (isBlocked(bufferOrEvent.getChannelIndex())) {
bufferBlocker.add(bufferOrEvent);
checkSizelimit();
}
else if (bufferOrEvent.isBuffer()) {
return bufferOrEvent;
}
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
if (!endOfStream) {
//处理Barrier
processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
}
}
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
//取消Barrier
processcancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
}
else {
if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
processEndOfPartition();
}
return bufferOrEvent;
}
}
}
所谓Barrier对齐,就是一个opertor必须收到从每个inputchannel发过来的同一序号的barrier之后才能发起本节点的checkpoint,如果有的channel的数据处理的快了,那该barrier后的数据还需要缓存起来,如果有的inputchannel被关闭了,那它就不会再发送barrier过来了:
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
// 只有一个输入通道的情况
if (totalNumberOfInputChannels == 1) {
if (barrierId > currentCheckpointId) {
// 触发检查点
currentCheckpointId = barrierId;
notifyCheckpoint(receivedBarrier);
}
return;
}
//对于多个输入通道的情况
if (numBarriersReceived > 0) {
// this is only true if some alignment is already progress and was not canceled
if (barrierId == currentCheckpointId) {
// 等待所有相同barrierId 的Barrier到来
onBarrier(channelIndex);
}
else if (barrierId > currentCheckpointId) {
//当前checkpoint还没完成,下一个checkpoint已经到来,则废弃当前的checkpoint
notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
// abort the current checkpoint
releaseBlocksAndResetBarriers();
// 开启新一轮Checkpoint
beginNewAlignment(barrierId, channelIndex);
}
else {
return;
}
}
else if (barrierId > currentCheckpointId) {
// 第一次收到Barrier
beginNewAlignment(barrierId, channelIndex);
}
else {
return;
}
//所有的Barrier已到齐,开启本轮的Checkpoint
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
//日志打印
releaseBlocksAndResetBarriers();
//执行Checkpoint
notifyCheckpoint(receivedBarrier);
}
}
当收到全部的barrier之后,就会触发notifyCheckpoint(),该方法又会调用StreamTask的triggerCheckpoint,和之前的operator是一样的。
文章最后发布于: 2019-05-06 16:18:44
相关阅读
java-多线程-CountDownLatch(闭锁) CyclicBarrier(栅
(代码来源网络共享) 这几个工具类其实说白了就是为了能够更好控制线程之间的通讯问题~ CountDownLatch 是一个同步的辅助类,允许
import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest2 { static CyclicBarrier c = new CyclicBar
简介栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行
一、CyclicBarrier作用 它允许一组线程互相等待,直到到达某个公共屏障点。在设计一组固定大小的线程的程序中,这些线程必须不时
CyclicBarrier是多线程中一个重要的类,主要用于线程组内部之间的线程的相互等待问题。 1.CyclicBarrier的工作原理 Cycli