Flink 2PC 一致性语义

一、引申(什么是 XA 事务)

XA(eXtended Architecture)是指由 X/Open 组织提出的分布式交易处理的规范。XA 是一个分布式事务协议,由 Tuxedo 提出,所以分布式事务也称为 XA 事务。XA 协议主要定义了事务管理器 TM(Transaction Manager,协调者)和资源管理器 RM(Resource Manager,参与者)之间的接口。其中,资源管理器往往由数据库实现,如 Oracle、DB2、MySQL,这些商业数据库都实现了 XA 接口,而事务管理器作为全局的调度者,负责各个本地资源的提交和回滚。XA 事务是基于两阶段提交(Two-phaseCommit,2PC)协议实现的,可以保证数据的强一致性,许多分布式关系型数据管理系统都采用此协议来完成分布式。阶段一为准备阶段,即所有的参与者准备执行事务并锁住需要的资源。当参与者 Ready 时,向 TM 汇报自己已经准备好。阶段二为提交阶段。当 TM 确认所有参与者都 Ready 后,向所有参与者发送 COMMIT 命令。

XA 事务允许不同数据库的分布式事务,只要参与在全局事务中的每个结点都支持 XA 事务。Oracle、MySQL 和 SQL Server 都支持 XA 事务。

XA 事务由一个或多个资源管理器(RM)、一个事务管理器(TM)和一个应用程序(ApplicationProgram)组成。

  • 资源管理器:提供访问事务资源的方法。通常一个数据库就是一个资源管理器。
  • 事务管理器:协调参与全局事务中的各个事务。需要和参与全局事务的所有资源管理器进行通信。
  • 应用程序:定义事务的边界。

XA 事务的缺点是性能不好,且无法满足高并发场景。一个数据库的事务和多个数据库间的 XA 事务性能会相差很多。因此,要尽量避免 XA 事务,如可以将数据写入本地,用高性能的消息系统分发数据,或使用数据库复制等技术。只有在其他办法都无法实现业务需求,且性能不是瓶颈时才使用 XA。

二、Flink 二阶段提交

2.1 引入 EXACTLY_ONCE 语义

EXACTLY_ONCE 语义简称 EOS,指的是每条输入消息只会影响最终结果一次,注意这里是影响一次,而非处理一次,Flink 一直宣称自己支持 EOS,实际上主要是对于 Flink 应用内部来说的,对于外部系统 (端到端) 则有比较强的限制

外部系统写入支持幂等性

外部系统支持以事务的方式写入

Flink 在 1.4.0 版本引入了 TwoPhaseCommitSinkFunction 接口,并在 Kafka Producer 的 connector 中实现了它,支持了对外部 Kafka Sink 的 EXACTLY_ONCE 语义。

详见:End-to-End Exactly-Once Processing in Apache Flink

2.2 Kafka 幂等性和事务性

在 kafka 0.11 版本中已经提出,kafka 将对事务和幂等性的支持,使得 kafka 端到端 exactly once 语义成为可能。幂等性与事务性都是 Kafka 发展过程中非常重要的。

在正常情况下,produce 向 Broker 投递消息,broker 将消息追加写到对应的流(即某一个 topic 的某一 partition)中,并向 Producer 返回 ACK 信号,表示确认收到。

1、幂等性的实现

kafka 为了实想幂等性,他在底层的设计架构中引入了 Producer 和 SequenceNumber。

(1)、ProducerID:在每一个新的 Producer 初始化时,或被分配一个唯一的 ProducerID,这个 ProducerID 对客户端使用者是不可见的。

(2)、sequenceNumber: 对于每个 producerID,Producer 发送数据的每个 Topic 和 Partition 都对饮一个从 0 开始递增的 SequenceNumber 值。

2、当引入幂等性后解决的问题。

同样的数据发送到 kafka 中会对数据增加 Pid 和 sequenceId

2、事务

在数据端对端数据保证中,另个一个我们特别关注的问题就是事务。即原子性操作。对应的结果是同时成功或者同时失败,kafka 的事务注重的生产和消费的的原子性操作。典型的例子为。
一系列的 Producer 生产消息和消费消息提交 Offsets 的操作在一个事务中。

例如产生的场景包括:

(1)、producer 多次发送消息封装在一个原子性操作,即要求同时成功,或者同时失败。

(2)、在消费者 & 生产者的模式下,因为 consumer 在 commit offsets 出现问题时,导致重复消费消息时,需要将这个模式下的 Consumer 和 Commit offsets 操作和 Producer 一系列生产消息的操作封装成一个原子性操作。

(3)、kafka 的事务总体可以分为三方面的内容:

  • 1)、只有 Producer 生产消息,这种场景需要事务的介入;
  • 2)、消费消息和生产消息并存,比如 Consumer&Producer 模式,这种场景是一般 Kafka 项目中比较常见的模式,需要事务介入;
  • 3)、但是只有 Consumer 消费消息,这种操作在实际项目中意义不大,和手动 Commit Offsets 的结果一样,而且这种场景不是事务的引入目的。

事务提供的 5 种 API 方法:

org.apache.kafka.clients.producer.Producer<K,V>接口中:
// 1. 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();
// 2. 开启事务
void beginTransaction() throws ProducerFencedException;
// 3. 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 4. 提交事务
void commitTransaction() throws ProducerFencedException;
// 5. 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;

详见:kafka 的幂等性和事务性

2.3 Flink 二阶段提交

场景描述

两阶段提交(two-phase commit, 2PC)是最基础的分布式一致性协议,应用广泛。

2PC 介绍:

2PC 在分布式系统中,为了让每个节点能够感知其他所有节点的事务执行情况,需要我们引入一个中心节点来统一所有节点的执行逻辑和进度,这个中心节点叫做协调者(Coordinator),而其中向中心节点汇报或者被中心节点调度的其他节点叫做参与者(Participant)

2PC 原理

①、请求阶段

  • 1、协调者向所有参与者发送准备请求与事务内容,询问是否可以准备事务提交,并等待参与者的响应。
  • 2、参与者执行事务中的包含操作,并记录 undo 日志(用于回滚)和 redo 日志(用于重放),但是不真正提交。
  • 3、参与者向协调者返回事务才做的执行结果,执行陈工返回 yes, 否则返回 no.

②、提交阶段(分成成功失败两种情况)

情况 1: 若所有的参与者都返回 yes, 说明事务可以提交。

  • 1、协调者向所有参与者发送 commit 请求。
  • 2、参与者收到 commit 请求后,将事务真正的提交上去,并释放占用的事务资源,并向协调者返回 ack。
  • 3、协调者收到所有参与者 ack 消息,事务成功完成。

情况 2:若有参与者返回 no 或者超时未返回,说明事务终端,需要回滚。

  • 1、协调者向所有参与者发送 rollback 请求。
  • 2、参与者收到 rollback 请求后,根据 undo 日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回 ack。
  • 3、协调者收到所有参与者的 ack 消息,事务回滚完成。
在这里插入图片描述

2pc 的优缺点

2PC 的优点在于原理非常简单,容易理解及实现。

缺点主要有 3 个,列举如下:

(1)协调者存在单点问题。如果协调者挂了,整个 2PC 逻辑就彻底不能运行。

(2)、执行过程是完全同步的。各参与者在等待其他参与者响应的过程中都处于阻塞状态,大并发下有性能问题。

(3)、仍然存在不一致风险。如果由于网络异常等意外导致只有部分参与者收到了 commit 请求,就会造成部分参与者提交了事务而其他参与者未提交的情况。

不过,现在人们在分布式一致性领域做了很多工作,以 ZooKeeper 为代表的分布式协调框架也数不胜数,2PC 有了这些的加持,可靠性大大提升了,也就能够真正用在要求高的生产环境中了。

Flink 基于 2PC 的实现

2PC 的最常见应用场景其实是关系型数据库,比如 mysql InnoDB 存储引擎的 XA 事务系统。
Flink 作为流式处理引擎,自然也提供了对 exactly once 语义的保证。flink 的内部意图检查点机制和轻量级分布式快照算法 ABS 保证 exactly once .。二我们要实现端到端的精确一次的输出逻辑,则需要施加以下两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。

在 Spark Streaming 中,要实现事务性写入完全靠用户自己,框架本身并没有提供任何实现。但是在 Flink 中提供了基于 2PC 的 SinkFunction,名为 TwoPhaseCommitSinkFunction,帮助我们做了一些基础的工作。

在这里插入图片描述

flink 官方推荐所有需要保证 exactly once 的 sink 逻辑都继承该抽象类。它具体定义如下四个抽象方法。需要我们去在子类中实现。

// 开始一个事务,返回事务信息的句柄
protected abstract TXN beginTransaction() throws Exception;
// 预提交(即提交请求)阶段的逻辑
protected abstract void preCommit(TXN transaction) throws Exception;
// 正式提交阶段的逻辑
protected abstract void commit(TXN transaction);
// 取消事务
protected abstract void abort(TXN transaction);
public class FlinkKafkaProducer<IN>
extends TwoPhaseCommitSinkFunction<
IN,
FlinkKafkaProducer.KafkaTransactionState,
FlinkKafkaProducer.KafkaTransactionContext> {
/**
* Semantics that can be chosen.
* <li>{@link #EXACTLY_ONCE}
* <li>{@link #AT_LEAST_ONCE}
* <li>{@link #NONE}
*/
public enum Semantic {
/**
* Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction
* that will be committed to Kafka on a checkpoint.
*
* <p>In this mode {@link FlinkKafkaProducer} sets up a pool of {@link
* FlinkKafkaInternalProducer}. Between each checkpoint a Kafka transaction is created,
* which is committed on {@link
FlinkKafkaProducer#notifyCheckpointComplete(long)}
. If
* checkpoint complete notifications are running late, {@link FlinkKafkaProducer} can run
* out of {@link FlinkKafkaInternalProducer}s in the pool. In that case any subsequent
* {@link FlinkKafkaProducer#snapshotState(FunctionSnapshotContext)} requests will fail and
* {@link FlinkKafkaProducer} will keep using the {@link FlinkKafkaInternalProducer} from
* the previous checkpoint. To decrease the chance of failing checkpoints there are four
* options:
* <li>decrease number of max concurrent checkpoints
* <li>make checkpoints more reliable (so that they complete faster)
* <li>increase the delay between checkpoints
* <li>increase the size of {@link FlinkKafkaInternalProducer}s pool
*/
EXACTLY_ONCE,
/**
* Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the
* Kafka buffers to be acknowledged by the Kafka producer on a checkpoint.
*/
AT_LEAST_ONCE,
/**
* Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or
* duplicated in case of failure.
*/
NONE
}

下面以 Flink 与 Kafka 的集成来说明 2PC 的具体流程。注意这里的 Kafka 版本必须是 0.11 及以上,因为只有 0.11 + 的版本才支持幂等 producer 以及事务性,从而 2PC 才有存在的意义。Kafka 内部事务性的机制如下框图所示。

flink 实现两阶段提交具体实现为:
FlinkKafkaProducer.commit () 方法实际上是代理了 KafkaProducer.commitTransaction () 方法,正式向 Kafka 提交事务。

Flink 版本:1.13.6

@Override
protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
} finally {
recycleTransactionalProducer(transaction.producer);
}
}
}

该方法的调用点位于 TwoPhaseCommitSinkFunction.notifyCheckpointComplete () 方法中,顾名思义,当所有的检查点都成功后,会调用这个方法。

@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
// the following scenarios are possible here
//
// (1) there is exactly one transaction from the latest checkpoint that
// was triggered and completed. That should be the common case.
// Simply commit that transaction in that case.
//
// (2) there are multiple pending transactions because one previous
// checkpoint was skipped. That is a rare case, but can happen
// for example when:
//
// - the master cannot persist the metadata of the last
// checkpoint (temporary outage in the storage system) but
// could persist a successive checkpoint (the one notified here)
//
// - other tasks could not persist their status during
// the previous checkpoint, but did not trigger a failure because they
// could hold onto their state and could successfully persist it in
// a successive checkpoint (the one notified here)
//
// In both cases, the prior checkpoint never reach a committed state, but
// this checkpoint is always expected to subsume the prior one and cover all
// changes since the last successful one. As a consequence, we need to commit
// all pending transactions.
//
// (3) Multiple transactions are pending, but the checkpoint complete notification
// relates not to the latest. That is possible, because notification messages
// can be delayed (in an extreme case till arrive after a succeeding checkpoint
// was triggered) and because there can be concurrent overlapping checkpoints
// (a new one is started before the previous fully finished).
//
// ==> There should never be a case where we have no pending transaction here
//
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =
pendingCommitTransactions.entrySet().iterator();
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
LOG.info(
"{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(),
checkpointId,
pendingTransaction,
pendingTransactionCheckpointId);
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
pendingTransactionIterator.remove();
}
if (firstError != null) {
throw new FlinkRuntimeException(
"Committing one of transactions failed, logging first encountered failure",
firstError);
}
}

从代码中可以看出,该方法每次从赈灾等待提交的事务句柄中取出一个,检查他的检查点 ID, 并调用 commit () 方法提交,这个阶段流程图为:

在这里插入图片描述

可见,只有在所有的检查点都成功的这个前提下,写入才会成功。这符合前文描述 2PC 的流程。其中 jobmanager 为协调者,各个算子为参与者,并且中有 sink 一个参与者会执行提交。一旦有了检查点失败,notifyCheckpointComplete () 方法不会执行,如果重试不成功,则最后会调用 abort () 方法回滚事务,如下:

@Override
protected void abort(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
transaction.producer.abortTransaction();
recycleTransactionalProducer(transaction.producer);
}
}

参考 list:


暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇