在flink中的每个函数和运算符都是有状态的。在处理过程中可以用状态来存储数据,这样可以利用状态来构建复杂操作。为了让状态容错,Flink需要设置checkpoint状态。Flink程序是通过checkpoint来保证容错,通过c
在flink中的每个函数和运算符都是有状态的。在处理过程中可以用状态来存储数据,这样可以利用状态来构建复杂操作。为了让状态容错,Flink需要设置checkpoint状态。Flink程序是通过checkpoint来保证容错,通过checkpoint机制,Flink可恢复作业的状态和计算位置。
Flink的checkpoin机制需要与流和状态的持久化存储交互,一般它要求:
默认情况,Flink是禁用检查点。要启用检查点,调用
// 启用检查点
// 单位:毫秒
env.enableCheckpointing(1000);
在启用检查点时,还可以配置检查点的其他参数。
public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
参考配置:
// --------
// 配置checkpoint
// 启用检查点
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATioN);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
Flink的checkpoint机制可以存储计时器和有状态operation的所有快照,包括:连接器、窗口或者用户自定义状态。具体checkpoint存储在哪儿(例如:是JobManager内存、文件系统或者数据库),依赖于状态后端的配置。
默认情况,状态保存在TaskManager的内存中,检查点存储在TM的内存中。为了适当地保存大状态,Flink支持其他的存储。我们可以通过:
StreamExecutionEnvironment.setStateBackend(…)
来指定存储方式
状态的应用场景:
Flink状态可以保存在堆内、或者是堆外。Flink也可以管理应用程序的状态,必要时也可以溢出到磁盘,如果应用要保持非常大的状态,可以不修改程序逻辑情况下配置状态后端存储。
Flink中有两种基本的状态:
Keyed State
Keyed State通常和key相关,仅仅在KeyedStream的方法和算子中使用。可以把 Keyed State看作是分区,而且每一个key仅出现在一个分区内。逻辑上每个 keyed-state和唯一元组<算子并发实例, key>绑定,由于每个key仅属于算子的一个并发,因此可以简化为<算子, key>
Operator State
对于 Operator State来说,每个Operator State和一个并发实例绑定。Kafka connector是Flink中使用operator state的一个很好的示例。每个Kafka消费者的并发在Operator State中维护一个 topic partition到offset的映射关系。
Operator state在Flink作业的并发改变后,会重新分发状态,分发的策略和keyed stated不一样。
Raw State与Managed State
Keyed Stated和Operator State分别有两种形式:managed 和 raw
Managed State是由Flink运行时管理的数据结构来表示的,例如:内部的Hash Table或者RocksDB。例如:ValueState、ListState等。Flink运行时会对这些状态进行编码并写入Checkpoint。
Raw State则保存在自己的数据结构中。checkpoint的时候,Flink并不知道状态里面具体的内容,仅仅写入一串字节序列到checkpoint中。
所有的DataStream的function都可以使用managed state,但raw state只能在实现算子时使用。由于Flink可以在修改并发时更好的分发状态数据,并且能够更好的管理内存,因为讲义使用 managed state.
Managed keyed state接口提供不同类型的状态访问接口,这些状态都作用在当前输入数据的key下。这些状态仅可在KeyedStream上使用,可以通过 stream.keyBy(…)得到KeyedStream。
所有支持的状态类型如下:
注意:
- 这些状态对象仅用于状态交互。状态本身不一定存储在内存中,还有可能保存在磁盘或者其他位置
- 从状态中获取的值取决于输入元素说代表的key,因此,在不同key上调用同一个接口,可能得到不同的值
可以通过实现 CheckpointedFunction 或者 ListCheckpointed
CheckpointedFunction接口:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
在Flink进行checkpoint时,会调用snapshotstate(),用户自定义函数初始化时会调用 initializeState。初始化包括第一次自定义函数初始化和从之前的 checkpoint 回复。因此,initializeState 中应该也包括状态恢复的逻辑。
Managed Operator State以list的形式存在,这些状态是一个可序列化对象的集合List,彼此独立,方便在改变并发后进行状态的重新分派。换句话说,这些对象是重新分配 non-keyed state的最细粒度。根据状态的不同访问方式,有以下两种分配模式:
ListCheckpointed接口:
ListCheckpointed接口是CheckpointedFunction接口的精简版,仅支持 even-split redistribution的list state
List
snapshotState(long checkpointId, long timestamp) throws Exception; void restoreState(List
state) throws Exception;
snapshotState()需要返回一个将写入到checkpoint的对象列表, restoreState则需要处理恢复回来的对象列表。
参考文献:
Flink官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/checkpointing.html
Https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/ops/state/checkpoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/state.html
--结束END--
本文标题: 「Flink」Flink的状态管理与容错
本文链接: https://lsjlt.com/news/4027.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0