找回密码
 立即注册
查看: 542|回复: 1

Flink Checkpoint道理解析

[复制链接]

1

主题

0

回帖

23

积分

新手上路

积分
23
发表于 2023-5-5 17:43:48 | 显示全部楼层 |阅读模式
Flink是一个有状态的分布式流式计算引擎,flink中的每个function或者是operator都可以是有状态的,有状态的function在措置流数据或事件的的同时会存储一部门用户自定义的数据,这使得flink的状态可以作为任何更精细操作的基础。然而总会有一些原因使流任务呈现异常(如网络故障、代码bug等),为了使得状态可以容错,flink引入了checkpoint机制。checkpoint使得flink能够恢复流任务的状态和位置,从而为流任务提供与无故障执行不异的语义。下面对flink的checkpoint机制进行总结,并分享一些使用经验。
Paper

Flink的checkpoint的过程依赖于异步樊篱快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇paper中被提出。理解了这篇paper也就大白了flink的chekpoint机制。paper整体来说斗劲简单易懂,下面简单介绍下paper的大体内容和核心的算法。

  • 概览
有状态的分布式流式计算使得大规模的部署和云计算成为可能,而且有着两个方针--低延迟和高吞吐。一个最基础的挑战就是如安在计算或任务可能掉败的情况下,提供数据一致性的保证。之前的凡是的方式是依赖周期性的全局快照,然而这些方式有两个错误谬误。第一,他们凡是需要暂停整个计算逻辑,这凡是会影响数据的摄取。第二,他们凡是需要持久化所有在传输中的数据,这会导致生成的快照比实际需要的大很多。
这篇paper主要介绍了一个轻量级分布式异步快照,只需要保留较少的数据即可。而且将这种轻量级的算法在flink中实现,之后通过验证表白,这种算法对数据的措置和计算影响很小,而且拥有线性的可扩展性,而且在快照斗劲频繁的时候性能依旧良好。接下来paper首先介绍了现存的几种流计算种使用的快照算法,以及存在的问题,而且简单的介绍了flink的流计算架构,主要是flink的一些基本概念,因此不再展开介绍。之后就是这篇paper的重点,分布式异步快照的实现以及状态的恢复。最后是性能评估和总结。

  • Asynchronous Barrier Snapshotting(ABS)
为了保证输出成果的一致性,分布式流措置系统凡是要能措置任务掉败的情况。一种常用的做法是周期性的发生快照,在任务掉败的情况下能后从快照恢复运行。快照是任务计算图的一个全局的状态,包含所有能够恢复任务的必要的数据。对于一个快照而言,从最终性(Termination)与可行性(feasibility)两个方面来阐述如何保障成果的正确性。
ABS将计算逻辑通过有向无环图划分为几个阶段(stage),每个stage将所有的输入数据和输出成果划分为一系列的措置中间状态,对于每一个stage来说,所有先前的输入和输出成果都已经完全措置。所以ABS的核心思想就是创建分阶段的快照,同时保持持续的数据摄取。

接下来介绍周期性的barrier如何起感化。
1)中央协调者周期性的从源头注入barrier。
2)当source接受到barrier后,当即发生一个快照,之后将barrier广播到所有的输出端。
3)当非source算子收到此中一个输入端的barrier后,立刻阻塞这个channel;这个channel中被阻塞的数据将会被缓存起来;直到改算子收到了所有的上游输入的barrier。
4)当非source算子接收到所有的上游输入的barrier之后,则会当即生成快照,保留当前算子的状态,之后将barrier广播到所有的输出端。
5)这个算子解除对上游输入channel的阻塞,继续计算新的输入数据。当数据最后完成sink,一个完整的查抄点才算完成。
如何证明最终性?数据channel的可靠性保证了每一个barrier城市被接收,只要任务还存活。此外的有向无环图中,数据的流动路径是确定的,每个算子最终城市收到所有上游算子的   barrier并触发生成快照。
如何证明一致性?由于输入通道的先进先出原则和barrier的阻塞功能,确保了在快照完成之前,下游算子不会计算阻塞的数据。

  • 掉败恢复
1)每个算子从持久化存储中恢复对应的状态作为其初始状态。
2)还原并措置备份的数据。
3)从输入通道从头摄取数据

如上图所示,任务只能恢复部门算子的情况也是有可能发生的,为了保证计算不反复,可以通过重放上游数据的方式进行措置。比如通过对比数据的offset,将offset小于已经措置过的最大的offset的数据丢弃。
以上就是这篇paper的概略内容和核心算法。

Flink Checkpoint

Flink整体来说是一个主从架构,JobManager负责资源打点、任务调剂、checkpoint触发等等协调性的工作,TaskManager负责按照代码逻辑措置数据,真正执行任务的角色。由于Flink的slot-sharing机制,一个TaskManager可能有多个子任务。
checkpoint流程
在checkpoint过程中,JobManager会按照代码配置的法则,按期触发checkpoint,在所有的source的子任务中注入checkpointBarrier,TaskManager在收到所有上游广播的CheckpointBarrier 后,触发checkpoint。当整个DAG图的子任务的checkpoint都做完之后,会陈述请示给JobManager,JobManager则认为这个checkpoint已经完成。整个流程与上述paper中的流程几乎一致。

Flink Checkpoint撑持两种语义,Exactly_Once 和 At_Least_Once。这两种语义的区别主要在于对barrier 对齐方式的措置,Exactly_Once的算子在接受到第一个barrier后会阻塞上游输入通道,将输入数据缓存起来,知道所有上游的barrier都收到之后才会继续措置这些数据。At_Least_Once则不会阻塞上游数据,即使没有对齐barrier也会措置后续的数据。
源码分析
Checkpoint的主要流程可以CheckpointCoordinator类中看到。
  1.         public PendingCheckpoint triggerCheckpoint(
  2.                         long timestamp,
  3.                         CheckpointProperties props,
  4.                         @Nullable String externalSavepointLocation,
  5.                         boolean isPeriodic,
  6.                         boolean advanceToEndOfTime) throws CheckpointException {
  7.                 if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
  8.                         throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
  9.                 }
  10.                 // 做一些预先的查抄
  11.                 synchronized (lock) {
  12.                         // 若此时coordinator已经被封锁则checkpoint掉败
  13.                         if (shutdown) {
  14.                                 throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
  15.                         }
  16.                         // 若是周期性执行而调剂器已经封锁则checkpoint掉败
  17.                         if (isPeriodic && !periodicScheduling) {
  18.                                 throw new CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
  19.                         }
  20.                         // 验证checkpoint是否达到最大并发数,验证checkpoint之间的最小时间间隔
  21.                         // 默认最大并发度为1,最小间隔为0,这些参数都可以提前自定义配置
  22.                         // 这些查抄都和savepoint没有关系
  23.                         if (!props.forceCheckpoint()) {
  24.                                 // 查抄是否只有一个触发checkpoint的请求
  25.                                 if (triggerRequestQueued) {
  26.                                         LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
  27.                                         throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
  28.                                 }
  29.                                 // 查抄当前有多少checkpoint在pending
  30.                                 checkConcurrentCheckpoints();
  31.                                 // 查抄最小时间间隔
  32.                                 checkMinPauseBetweenCheckpoints();
  33.                         }
  34.                 }
  35.                 // 查抄所有子任务的状态,若任务不在运行则直接丢弃此次checkpoint
  36.                 Execution[] executions = new Execution[tasksToTrigger.length];
  37.                 for (int i = 0; i < tasksToTrigger.length; i++) {
  38.                         Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
  39.                         if (ee == null) {
  40.                                 LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
  41.                                                 tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  42.                                                 job);
  43.                                 throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
  44.                         } else if (ee.getState() == ExecutionState.RUNNING) {
  45.                                 executions[i] = ee;
  46.                         } else {
  47.                                 LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
  48.                                                 tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  49.                                                 job,
  50.                                                 ExecutionState.RUNNING,
  51.                                                 ee.getState());
  52.                                 throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
  53.                         }
  54.                 }
  55.                 // 查抄所有待确认算子的运行状态,若不在运行中则直接丢弃此次checkpoint
  56.                 Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
  57.                 for (ExecutionVertex ev : tasksToWaitFor) {
  58.                         Execution ee = ev.getCurrentExecutionAttempt();
  59.                         if (ee != null) {
  60.                                 ackTasks.put(ee.getAttemptId(), ev);
  61.                         } else {
  62.                                 LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
  63.                                                 ev.getTaskNameWithSubtaskIndex(),
  64.                                                 job);
  65.                                 throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
  66.                         }
  67.                 }
  68.                 // 做完上述的一系列查抄后开始真正触发checkpoint
  69.                 // 通过加特殊的锁来确保触发请求不会并发
  70.                 synchronized (triggerLock) {
  71.                         // checkpoint保留路径
  72.                         final CheckpointStorageLocation checkpointStorageLocation;
  73.                         final long checkpointID;
  74.                         try {
  75.                                 // 这个必需使用全局的锁,因为与外部的系统交互可能会导致阻塞一段时间
  76.                                 // checkpointID凡是是一个递增的时间戳
  77.                                 checkpointID = checkpointIdCounter.getAndIncrement();
  78.                                 checkpointStorageLocation = props.isSavepoint() ?
  79.                                                 checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
  80.                                                 checkpointStorage.initializeLocationForCheckpoint(checkpointID);
  81.                         }
  82.                         catch (Throwable t) {
  83.                                 // 若掉败则记录掉败次数,这些指标都可以通过metrics收集起来,用于checkpoint监控
  84.                                 int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
  85.                                 LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
  86.                                                 job,
  87.                                                 numUnsuccessful,
  88.                                                 t);
  89.                                 throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
  90.                         }
  91.                         // 创建一个等待执行的Checkpoint
  92.                         final PendingCheckpoint checkpoint = new PendingCheckpoint(
  93.                                 job,
  94.                                 checkpointID,
  95.                                 timestamp,
  96.                                 ackTasks,
  97.                                 props,
  98.                                 checkpointStorageLocation,
  99.                                 executor);
  100.                         if (statsTracker != null) {
  101.                                 PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
  102.                                         checkpointID,
  103.                                         timestamp,
  104.                                         props);
  105.                                 checkpoint.setStatsCallback(callback);
  106.                         }
  107.                         // 调剂器会清理掉过期的checkpoint
  108.                         final Runnable canceller = () -> {
  109.                                 synchronized (lock) {
  110.                                         // 若checkpoint已颠末期则直接丢弃
  111.                                         if (!checkpoint.isDiscarded()) {
  112.                                                 LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
  113.                                                 failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
  114.                                                 pendingCheckpoints.remove(checkpointID);
  115.                                                 rememberRecentCheckpointId(checkpointID);
  116.                                                 triggerQueuedRequests();
  117.                                         }
  118.                                 }
  119.                         };
  120.                         try {
  121.                                 // 从头获取coordinator锁
  122.                                 synchronized (lock) {
  123.                                         // 因为之前做完预查抄后释放了锁,所以需要再查抄下上述条件
  124.                                         if (shutdown) {
  125.                                                 throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
  126.                                         }
  127.                                         else if (!props.forceCheckpoint()) {
  128.                                                 if (triggerRequestQueued) {
  129.                                                         LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
  130.                                                         throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
  131.                                                 }
  132.                                                 checkConcurrentCheckpoints();
  133.                                                 checkMinPauseBetweenCheckpoints();
  134.                                         }
  135.                                         LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
  136.                                         pendingCheckpoints.put(checkpointID, checkpoint);
  137.                                         // 创建超时打消线程
  138.                                         ScheduledFuture<?> cancellerHandle = timer.schedule(
  139.                                                         canceller,
  140.                                                         checkpointTimeout, TimeUnit.MILLISECONDS);
  141.                                         if (!checkpoint.setCancellerHandle(cancellerHandle)) {
  142.                                                 // 若checkpoint超时则会被打消
  143.                                                 cancellerHandle.cancel(false);
  144.                                         }
  145.                                         // 提前创建钩子等待所有子任务触发checkpoint
  146.                                         final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
  147.                                                         checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
  148.                                         for (MasterState s : masterStates) {
  149.                                                 checkpoint.addMasterState(s);
  150.                                         }
  151.                                 }
  152.                                 // end of lock scope
  153.                                 final CheckpointOptions checkpointOptions = new CheckpointOptions(
  154.                                                 props.getCheckpointType(),
  155.                                                 checkpointStorageLocation.getLocationReference());
  156.                                 // 向所有子任务发送动静触发checkpoint
  157.                                 for (Execution execution: executions) {
  158.                                         if (props.isSynchronous()) {
  159.                                                 execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
  160.                                         } else {
  161.                                                 // 触发checkpoint,从source算子广播CheckBarrier,当所有算子都完成Checkpoint后,jobManager认为Checkpoint已完成
  162.                                                 execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
  163.                                         }
  164.                                 }
  165.                                 // 将掉败的触发器数量归0
  166.                                 numUnsuccessfulCheckpointsTriggers.set(0);
  167.                                 return checkpoint;
  168.                         }
  169.                         catch (Throwable t) {
  170.                                 // checkpoint呈现异常则则从pending缓存中去除
  171.                                 synchronized (lock) {
  172.                                         pendingCheckpoints.remove(checkpointID);
  173.                                 }
  174.                                 // 记录checkpoint掉败次数
  175.                                 int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
  176.                                 LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
  177.                                                 checkpointID, job, numUnsuccessful, t);
  178.                                 if (!checkpoint.isDiscarded()) {
  179.                                         failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
  180.                                 }
  181.                                 try {
  182.                                         // 掉败后断根此次checkpoint的保留路径和该路径下的所有数据
  183.                                         checkpointStorageLocation.disposeOnFailure();
  184.                                 }
  185.                                 catch (Throwable t2) {
  186.                                         LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
  187.                                 }
  188.                                 throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
  189.                         }
  190.                 } // end trigger lock
  191.         }
复制代码
监控与优化

如果Flink任务对数据一致性有较高的需求,那么checkpoint的指标收集和监控必不成少。一个是可以周期性的检测checkpoint掉败次数并触发报警。此外,可以监控checkpoint的耗时和大小,若耗时较大则可以通过调整checkpoint的间隔来优化,而且过于频繁的checkpoint会导致hdfs小文件过多,增加namenode压力。若checkpoint较大可以测验考试使用RocksDB并开启增量checkpoint。若任务恢复的过程中若checkpoint较大则会从HDFS等存储系统恢复数据,恢复时间可能较长,则可以设置当地状态恢复。
若从checkpoint恢复任务掉败,则可以考虑降级法子恢复,比如通过Flink Metrics将任务消费到的kafka offset按期同步下来,在checkpoint不成用的时候,可以通过指定kafka offset的方式来恢复任务,虽然无法保证数据一致性,但对于大大都一致性要求不高的场景也可以适用。


参考文献
回复

使用道具 举报

0

主题

4

回帖

15

积分

新手上路

积分
15
发表于 2023-5-5 17:44:05 | 显示全部楼层
太棒了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|T9AI - 深度人工智能平台 ( 沪ICP备2023010006号 )

GMT+8, 2024-12-4 16:22 , Processed in 0.055970 second(s), 23 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表