의문의 시작
Flink CDC의 MySQL snapshot은 청크 단위로 SELECT문을 이용해 데이터를 복사하고, SHOW MASTER STATUS문을 이용해 GTIDs 값을 기록합니다.
이후 binlog streaming 단계로 전환할 때, 모든 청크의 GTIDs 값 중 가장 낮은(오래된) 값부터 binlog event를 읽기 시작합니다. 이걸 보는데 문득 이런 생각이 들었습니다.
snapshot에서 이미 가져온 데이터가 binlog에서 다시 나와서, sink가 upsert 모드가 아니면 중복 insert가 쌓이는 것 아닐까?
결론부터 말하면, 중복은 발생하지 않았습니다.
Flink CDC는 Netflix의 DBLog 논문에 기반한 2단계 중복 방지 메커니즘을 갖고 있기 때문입니다.
아래는 그 과정을 설명합니다.
1단계: Chunk 정규화 (Normalization)
각 chunk를 읽을 때는 3개의 step이 순차적으로 실행됩니다.
MySqlSnapshotSplitReadTask.java
// Step 1: Low Watermark 기록
final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection);
// Step 2: SELECT 실행 - chunk 데이터 읽기
createDataEvents(ctx, snapshotSplit.getTableId());
// Step 3: High Watermark 기록
highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection);
여기서 currentBinlogOffset()은 내부적으로 SHOW MASTER STATUS를 실행하여 현재 binlog file, position, GTID set을 가져옵니다.
그다음이 정규화의 핵심으로, SnapshotSplitReader.java에서 backfill 과정이 이어집니다
// Step 1: execute snapshot read task
SnapshotResult<MySqlOffsetContext> snapshotResult = snapshot(sourceContext);
// Step 2: read binlog events between low and high watermark and backfill changes into snapshot
backfill(snapshotResult, sourceContext);
backfill은 Low~High Watermark 사이의 binlog을 읽어 snapshot 데이터에 PK 기준 upsert 합니다.
특히 SnapshotSplitReader.pollWithBuffer() 메서드의 주석이 이를 명확히 설명하고 있습니다
// data input: [low watermark event][snapshot events][high watermark event]
// [binlog events][binlog-end event]
// data output: [low watermark event][normalized events][high watermark event]
pollWithBuffer() 메서드 수행 후 정규화가 되면서 binlog events가 snapshot events에 upsert 되고 normalized events로 바뀐 모습을 볼 수 있습니다.
실제 정규화 코드 (SnapshotSplitReader.java):
if (!reachBinlogStart) {
// snapshot 데이터를 PK 기준 Map에 저장
snapshotRecords.put((Struct) record.key(), Collections.singletonList(record));
} else {
// binlog 이벤트로 snapshot 데이터를 upsert
RecordUtils.upsertBinlog(snapshotRecords, record, ...);
}
RecordUtils.upsertBinlog()는 binlog 이벤트의 operation 타입에 따라 동작합니다.
- INSERT(CREATE): 해당 PK의 snapshot 레코드를 교체
- UPDATE: AFTER 이미지로 교체 (단, Split 범위 밖이라면 warn log를 뱉고 처리하지 않음)
- DELETE: 해당 PK의 레코드를 제거
이 과정을 거치면 각 chunk는 High Watermark 시점의 일관된 스냅샷으로 정규화됩니다.
2단계: Binlog Streaming의 Per-Chunk 필터링
모든 snapshot chunk가 완료되면 binlog streaming이 시작됩니다. 시작 위치는 모든 chunk의 High Watermark 중 가장 낮은 값입니다.
public static BinlogOffset getStartingOffsetOfBinlogSplit(
List<FinishedSnapshotSplitInfo> finishedSnapshotSplits) {
BinlogOffset startOffset = finishedSnapshotSplits.get(0).getHighWatermark();
for (FinishedSnapshotSplitInfo finishedSnapshotSplit : finishedSnapshotSplits) {
if (finishedSnapshotSplit.getHighWatermark().isBefore(startOffset)) {
startOffset = finishedSnapshotSplit.getHighWatermark();
}
}
return startOffset;
}
여기서 맨 처음 의문이 든 "가장 낮은 값부터 시작하면 중복 아니냐?"라는 질문이 나오게 되었는데, 이를 아래 코드에서 명쾌하게 해결하는 걸 알 수 있었습니다.
답은 BinlogSplitReader.shouldEmit() 메서드입니다.
BinlogSplitReader.java L:250-291
/**
* Returns the record should emit or not.
*
* <p>The watermark signal algorithm is the binlog split reader only sends the binlog event that
* belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
* since the offset is after its high watermark.
*
* <pre> E.g: the data input is :
* snapshot-split-0 info : [0, 1024) highWatermark0
* snapshot-split-1 info : [1024, 2048) highWatermark1
* the data output is:
* only the binlog event belong to [0, 1024) and offset is after highWatermark0 should send,
* only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.
* </pre>
*/
private boolean shouldEmit(SourceRecord sourceRecord) {
// ...
Object[] chunkKey = SplitKeyUtils.getSplitKey(splitKeyType, nameAdjuster, target);
FinishedSnapshotSplitInfo matchedSplit =
SplitKeyUtils.findSplitByKeyBinary(finishedSplitsInfo.get(tableId), chunkKey);
return matchedSplit != null && position.isAfter(matchedSplit.getHighWatermark());
}
BinlogSplitReader.shouldEmit()에서는 어떤 binlog 이벤트에 대해 메인 스트림에 내보낼지 여부를 결정합니다. 따라서 만약 값이 false라면 타겟에 반영되지 않는 것이죠.
동작 원리
- binlog 이벤트의 PK값으로 어떤 snapshot chunk에 속하는지 binary search
- 해당 chunk의 개별 High Watermark 이후인 이벤트만 emit
- 이전이면 버림 (이미 chunk 정규화에서 반영된 데이터)
구체적 예시
chunk-0: PK [0, 1000) → highWatermark = 150
chunk-1: PK [1000, 2000) → highWatermark = 280
chunk-2: PK [2000, 3000] → highWatermark = 350
binlog streaming 시작 위치: min(150, 280, 350) = 150
| binlog pos | 이벤트 | PK | 속한 chunk | HW | 판정 |
|---|---|---|---|---|---|
| 155 | INSERT | 500 | chunk-0 | 150 | 155 > 150 → emit |
| 160 | INSERT | 1500 | chunk-1 | 280 | 160 < 280 → drop |
| 290 | UPDATE | 1500 | chunk-1 | 280 | 290 > 280 → emit |
position 160의 PK=1500 INSERT의 경우 position이 highWatermark보다 낮기 때문에 이미 반영된 것으로 판단 후 버립니다.
이처럼 chunk별 개별 High Watermark 기준 필터링 덕분에 중복이 발생하지 않는 것을 보장할 수 있습니다.
예외: backfill skip 옵션
MySqlSnapshotSplitReadTask.java에서 scan.incremental.snapshot.backfill.skip 옵션이 true이면 backfill을 건너뛰고 High Watermark를 Low Watermark와 동일하게 설정하는 경우가 있습니다.
if (isBackfillSkipped) {
// Directly set HW = LW if backfill is skipped. Binlog events created during snapshot
// phase could be processed later in binlog reading phase.
//
// Note that this behaviour downgrades the delivery guarantee to at-least-once. We can't
// promise that the snapshot is exactly the view of the table at low watermark moment,
// so binlog events created during snapshot might be replayed later in binlog reading
// phase.
highWatermark = lowWatermark;
이 경우에는 중복 가능성이 존재합니다. 다만 기본값은 false이므로 정상 사용 시 exactly-once가 보장됩니다.
궁금해서 이 옵션이 왜 존재하는지 한 번 찾아봤는데요, 아래와 같은 이슈와 동기를 찾을 수 있었습니다.
https://github.com/apache/flink-cdc/issues/2553
이슈 & 동기
Exactly-once를 보장하는 이런 Backfill Log 과정이 때에 따라서는 무겁고, 소스에 부하가 많이 가는 작업일 수 있습니다.
이슈에서 예시로 든 Postgres의 경우 변경 데이터를 읽기 위해 Replication Slot과 연결이 필요한데, Parallelism 수에 맞게 개별적인 연결을 유지하면서 + 동시에 스냅샷을 읽는 도중 데이터가 바뀌면 보정(Backfill) 합니다.
Parallelism을 4로 두면, 일단 4개의 커넥터가 필요하고, Enumerator에서 관리자용 연결이 또 하나 필요합니다. (일단 5개)
만약 DB 소스가 10개라면 5 * 10 = 50개나 되어버리는 것이죠.
따라서 이 작업은 At-least-once로 처리해 부하를 줄이자!라는 것입니다.
어차피 타겟 DB에서 upsert 옵션이 되어있다면, 같은 이벤트가 여러 번 가도 멱등성을 보장하기 때문에 정합성에 문제가 없다는 판단입니다. 이러면 backfill 과정도 생략되며 계속 연결을 유지해야 하는 커넥션의 수도 줄어듭니다.
Flink에서는 Exactly-once를 위해 더 정교한 작업을 처리하려다 보니 이런 이슈도 발생할 수 있는 것 같습니다.
만약 굳이 필요 없다면 scan.incremental.snapshot.backfill.skip 옵션을 true로 두는 것도 충분히 괜찮은 선택 같네요.
정리
| 단계 | 역할 | 핵심 코드 |
|---|---|---|
| Chunk 정규화 | snapshot + binlog merge로 HW 시점 일관성 확보 | SnapshotSplitReader.pollWithBuffer() |
| Per-Chunk 필터링 | PK 기반으로 chunk별 HW 이후 이벤트만 통과 | BinlogSplitReader.shouldEmit() |
"가장 낮은 High Watermark부터 binlog을 읽지만, 각 레코드가 속한 chunk의 개별 HW 이후 이벤트만 emit 한다." 이것이 exactly-once를 달성하는 DBLog 알고리즘의 핵심입니다.
그리고 이 알고리즘은 Debezium의 Incremental Snapshot이 동작하는 원리와 동일합니다.
코드를 보다 보니 이 부분에 대한 설명이 주석에 잘 적혀있는 것을 보았습니다.
혹시 좀 더 깊게 보고 싶으신 분들은 링크에 표기된 코드 본문을 따라가면 블로그 글에 기재된 내용 이상의 무언가(?)을 얻으실 수도!
참고
'오픈소스' 카테고리의 다른 글
| [flink-cdc] Iceberg sink connector에서의 default value 지원 (0) | 2026.02.14 |
|---|---|
| [flink-cdc] VARIANT 타입과 PARSE_JSON 함수 (0) | 2026.01.31 |
| [flink-cdc] MySQL 커넥터의 BIGINT UNSIGNED 무한 Chunk Splitting 버그 (0) | 2026.01.18 |