1. 개요
Flink CDC 파이프라인은 "재처리"를 피할 수 없다.
분산 스트리밍 시스템은 이론적으로는 아름답지만, 현실은 변수가 많습니다. 소싱하는 Kafka 토픽의 파티션이 갑자기 늘어나고, 노드가 OOM으로 죽고, 디스크가 가득 차고, GC로 인해 타임아웃이 발생할 수 있습니다.
Apache Flink 기반의 CDC 파이프라인도 마찬가지입니다.
현시점 기준으로 스트리밍 파이프라인 중 운영 환경에서 장애를 고려하지 않은 시스템은 아마 없을 것이라 생각합니다.
Flink는 이러한 장애에 대응하기 위해 체크포인트(Checkpoint) 메커니즘을 제공하고 있습니다. 주기적으로 스트림 처리 상태를 스냅샷으로 저장하고, 장애 발생 시 마지막으로 성공한 체크포인트로부터 복구합니다.
단, 이번 포스팅에서 이야기하고 싶은 내용은 여기서 발생하는 핵심적인 문제입니다.
체크포인트 이후에 처리되었지만 아직 다음 체크포인트에 포함되지 않은 이벤트들은, 체크포인트 복구 후 다시 한 번 전달됩니다. 즉, 같은 이벤트가 싱크(Sink)에 두 번 이상 도착할 수 있습니다.
이것은 Flink만의 문제는 아닌 게, Kafka Consumer의 오프셋 리셋, Debezium CDC 소스 커넥터의 binlog position 재설정이 이런 문제를 야기할 수 있고, 심지어 "그냥" 잘못 보내줬던 케이스가 존재할 수도 있어서 실 서비스에서 같은 데이터가 두 번 이상 처리되는 상황은 자연스러운 현상입니다.
그럼 항상 섬뜩한 생각이 따라오는데,, "같은 이벤트가 두 번 싱크되면 데이터가 꼬이지 않을까?"라는 것입니다.
여러 답변이 있을 수 있겠지만, 이번 글에서는 Flink CDC 코드베이스의 실제 구현을 통해 이 질문에 대한 답을 알아보려 합니다.
결론부터 말하면 Upsert 기반의 멱등한 쓰기(idempotent write)가 그 해답입니다.
2. 배경 - Delivery Semantics 3종 비교
일반적으로 분산 메시징 시스템에서 메시지 전달 보장(Delivery Semantics)은 세 가지 수준으로 나뉘게 됩니다.
| 보장 수준 | 의미 | 비용 |
|---|---|---|
| At-most-once | 유실 가능, 중복 없음 | 낮음 |
| At-least-once | 유실 없음, 중복 가능 | 중간 |
| Exactly-once | 유실 없음, 중복 없음 | 높음 (2PC 등 필요) |
- At-most-once (최대 한 번): 가장 단순하게, 메시지를 보내고 확인하지 않는 방법입니다. 유실은 될 수 있지만 중복은 허용하지 않습니다. 로그 수집처럼 일부 유실이 허용되는 경우에 적합하지만, CDC 파이프라인에서는 데이터 유실이 곧 데이터 불일치이므로 선택지가 될 수 없습니다.
- Exactly-once (정확히 한 번): 모든 시스템이 원하는 이상적인 방법입니다. 메시지 유실도 없고, 중복도 없습니다. 이를 달성하려면 2-Phase Commit(2PC)이나 트랜잭셔널 싱크 같은 무거운 메커니즘이 필요하게 됩니다. 분산 환경에서 2PC는 코디네이터 장애, 타임아웃, 참여자 간 불일치 등 수많은 엣지 케이스를 처리해야 하며, 처리량(throughput)도 크게 떨어지게 됩니다.
- At-least-once (최소 한 번): 많은 CDC 시스템이 선택하는 현실적인 타협점입니다. 메시지 유실은 막되, 중복은 허용합니다. Flink에서는 Exactly-once를 제공한다고 하지만, 이는 Flink State의 일관성이고 Flink CDC에서는 동일한 레코드의 Sink가 여러 번 될 수 있습니다.
따라서 Flink CDC의 주요 싱크 커넥터들(Iceberg, Paimon, Doris, Fluss..)은 대부분 이 현실적인 선택을 따릅니다.
공식 문서에서도 아래와 같은 문구가 명시되어 있습니다.
"Not support exactly-once. The connector uses at-least-once + primary key table for idempotent writing."
단, 여기서 눈여겨봐야 할 점은 Exactly-once를 구현하지 않아도 upsert + PK 기반 멱등성으로 동일한 결과를 달성할 수 있다는 것입니다. 언뜻 보면 말장난 같은데, 이게 어떻게 가능한지 살펴봅니다.
3. 멱등성(Idempotency)이란?
멱등성(Idempotency) 은 수학 용어입니다. 함수 f가 멱등하다는 것은 다음을 만족하게 됩니다.
f(f(x)) = f(x)
같은 연산을 한 번 적용하든 두 번 적용하든, 결과가 동일하다는 뜻입니다.
대표적으로 절댓값 함수의 경우 ||-3|| = |-3| = 3 처럼 여러 번 씌워도 결과는 동일합니다.
이 성질이 분산 시스템에서 왜 중요한지는 DB 테이블에 발생 가능한 DML을 통해 직관적으로 이해할 수 있습니다.
- INSERT는 멱등하지 않습니다.
INSERT INTO users VALUES (1, 'Alice')를 두 번 실행하면, 첫 번째는 성공하지만 두 번째는 Primary Key 충돌 에러가 발생합니다. (PK가 없다면 같은 행이 두 개 삽입됩니다.) 어느 쪽이든 "한 번 실행한 것과 같은 결과"가 아니다.
- UPSERT는 멱등합니다.
UPSERT INTO users VALUES (1, 'Alice')를 두 번 실행하면, 첫 번째는 행을 삽입하고, 두 번째는 이미 존재하는 행을 같은 값으로 덮어씁니다. 결과는 동일하게(1, 'Alice')한 행이 남습니다.
- DELETE도 멱등합니다.
-
DELETE FROM users WHERE pk = 1을 두 번 실행하면, 첫 번째는 행을 삭제하고, 두 번째는 삭제할 행이 없으므로 아무 일도 하지 않는다. (단, 에러를 던지지 않아야 합니다.)
-
여기서 주목할 점은 당연하게도 UPSERT입니다.
UPSERT의 의미를 한 문장으로 정리하면 "존재하면 덮어쓰고, 없으면 삽입한다."라는 것이고, 이 연산은 본질적으로 멱등합니다. 같은 PK에 같은 값을 몇 번을 써도 최종 상태는 항상 같습니다.
바로 이 성질을 이용해 Flink CDC에서는 at-least-once 환경에서의 중복 전달 문제를 해결합니다.
4. Flink CDC 커넥터별 구현
실제 Sink 커넥터들은 이를 어떻게 구현하고 있는지 살펴봅니다.
4-1. Iceberg: Equality Delete + Checkpoint Deduplication
Iceberg 싱크 커넥터는 두 단계로 멱등성을 보장합니다.
1단계: 이벤트를 Upsert 시맨틱으로 변환
RowDataUtils.java는 CDC의 DataChangeEvent를 Iceberg가 이해하는 RowData로 변환하는 역할을 합니다. 여기서 핵심은 INSERT, UPDATE, REPLACE를 모두 RowKind.INSERT로 매핑한다는 점입니다.
// RowDataUtils.java (Lines 36-53)
// flink-cdc-pipeline-connector-iceberg/.../utils/RowDataUtils.java
switch (dataChangeEvent.op()) {
case INSERT:
case UPDATE:
case REPLACE:
{
recordData = dataChangeEvent.after();
kind = RowKind.INSERT;
break;
}
case DELETE:
{
recordData = dataChangeEvent.before();
kind = RowKind.DELETE;
break;
}
default:
throw new IllegalArgumentException("don't support type of " + dataChangeEvent.op());
}
단순히 UPDATE를 INSERT로 변환해 버리면 기존 행이 남는다고 생각할 수 있는데요, 여기서 Iceberg의 equality-delete 메커니즘이 등장합니다.
IcebergWriter.java에서 RowDataTaskWriterFactory를 생성할 때, 테이블 스키마의 identifierFieldIds(= PK 필드 ID 목록)을 같이 전달하게 됩니다.
// IcebergWriter.java (Lines 133-147)
// flink-cdc-pipeline-connector-iceberg/.../v2/IcebergWriter.java
private RowDataTaskWriterFactory getRowDataTaskWriterFactory(TableId tableId) {
Table table = catalog.loadTable(TableIdentifier.parse(tableId.identifier()));
RowType rowType = FlinkSchemaUtil.convert(table.schema());
RowDataTaskWriterFactory rowDataTaskWriterFactory =
new RowDataTaskWriterFactory(
table,
rowType,
DEFAULT_MAX_FILE_SIZE,
FileFormat.fromString(DEFAULT_FILE_FORMAT),
new HashMap<>(),
new ArrayList<>(table.schema().identifierFieldIds()),
true);
rowDataTaskWriterFactory.initialize(taskId, attemptId);
return rowDataTaskWriterFactory;
}
table.schema().identifierFieldIds()는 Iceberg 테이블 스키마에서 PK(식별자 필드)들의 ID 목록을 가져옵니다. 이 목록이 비어 있지 않으면, Iceberg는 내부적으로 PartitionedDeltaWriter를 생성하게 됩니다. 이 Writer는 INSERT 행을 기록할 때, 같은 PK를 가진 기존 행을 무효화하는 동등 삭제(equality-delete) 파일을 함께 생성합니다. 따라서 UPDATE를 RowKind.INSERT로 내려보내도 equality-delete 파일이 함께 생성되어 올바른 upsert 시맨틱이 보장되는 것이죠.
단, 같은 PK(pk=1)에 대해 UPSERT가 두 번 도착하는 경우엔 두 가지 상황이 존재할 수 있습니다.
- 동일한 체크포인트에 포함되지 않는 경우
- 동등 삭제 파일의 경우 이전 체크포인트(커밋)된 데이터 파일에 대해서만 적용이 되기 때문에 두 번째 UPSERT의 데이터만 남게 되어 중복 적용의 영향이 없습니다.
- 두 개의 UPSERT가 동일한 체크포인트에 포함되는 경우
- Writer는 이미 삽입한 키를 insertedRows Map으로 추적하고 있기 때문에 position delete 기록 후 가장 마지막 UPSERT의 데이터만 남깁니다. 결과적으로 둘 다 중복 적용의 영향이 없습니다.
2단계: Checkpoint ID 기반 중복 커밋 방지
Writer 수준의 멱등성에 더해, Committer 수준에서도 중복된 체크포인트 커밋을 막습니다.
IcebergCommitter.java는 커밋 전에 현재 테이블의 스냅샷 히스토리를 조회하여, 같은 Checkpoint ID로 이미 커밋이 되었는지 확인합니다.
// IcebergCommitter.java (Lines 114-127)
// flink-cdc-pipeline-connector-iceberg/.../v2/IcebergCommitter.java
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null) {
Iterable<Snapshot> ancestors =
SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot);
long lastCheckpointId =
getMaxCommittedCheckpointId(ancestors, newFlinkJobId, operatorId);
if (lastCheckpointId == checkpointId) {
LOGGER.warn(
"Checkpoint id {} has been committed to table {}, skipping",
checkpointId,
tableId.identifier());
continue;
}
}
커밋이 실제로 실행될 때는 Checkpoint ID를 스냅샷 메타데이터에 기록합니다.
// IcebergCommitter.java (Lines 183-192)
private static void commitOperation(
SnapshotUpdate<?> operation,
String newFlinkJobId,
String operatorId,
long checkpointId) {
operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
operation.set(SinkUtil.OPERATOR_ID, operatorId);
operation.commit();
}
이렇게 Iceberg 싱크는 두 단계로 멱등성을 보장합니다.
- step 1 (레코드 수준): Upsert 시맨틱 + equality-delete로 같은 PK의 중복 레코드가 와도 최종 상태가 동일하다.
- step 2 (커밋 수준): Checkpoint ID를 스냅샷에 기록하여, 같은 체크포인트의 파일을 두 번 커밋하는 것 자체를 방지한다.
4-2. Paimon: LSM-Tree Merge
Apache Paimon은 LSM-Tree 기반의 테이블 스토어로, PK 테이블에서 같은 키의 다중 레코드는 compaction 시 merge됩니다. Paimon의 merge 전략에서 기본값은 "deduplicate" -> 즉, 같은 PK의 가장 최신 레코드를 유지하는 것입니다.
PaimonWriterHelper.java는 두 가지 변환 메서드를 제공하는데요. 첫 번째는 단순 변환으로, Iceberg과 동일하게 INSERT/UPDATE/REPLACE를 모두 RowKind.INSERT로 매핑하는 방식입니다.
// PaimonWriterHelper.java (Lines 210-236)
// flink-cdc-pipeline-connector-paimon/.../v2/PaimonWriterHelper.java
public static GenericRow convertEventToGenericRow(
DataChangeEvent dataChangeEvent, List<RecordData.FieldGetter> fieldGetters) {
GenericRow genericRow;
RecordData recordData;
switch (dataChangeEvent.op()) {
case INSERT:
case UPDATE:
case REPLACE:
{
recordData = dataChangeEvent.after();
genericRow = new GenericRow(RowKind.INSERT, recordData.getArity());
break;
}
case DELETE:
{
recordData = dataChangeEvent.before();
genericRow = new GenericRow(RowKind.DELETE, recordData.getArity());
break;
}
...
}
...
}
두 번째는 full changelog 변환으로, UPDATE를 UPDATE_BEFORE + UPDATE_AFTER 쌍으로 분리하는 방식입니다.
// PaimonWriterHelper.java (Lines 239-280)
public static List<GenericRow> convertEventToFullGenericRows(
DataChangeEvent dataChangeEvent,
List<RecordData.FieldGetter> fieldGetters,
boolean hasPrimaryKey) {
List<GenericRow> fullGenericRows = new ArrayList<>();
switch (dataChangeEvent.op()) {
case INSERT:
{
fullGenericRows.add(
convertRecordDataToGenericRow(
dataChangeEvent.after(), fieldGetters, RowKind.INSERT));
break;
}
case UPDATE:
case REPLACE:
{
if (hasPrimaryKey) {
fullGenericRows.add(
convertRecordDataToGenericRow(
dataChangeEvent.before(),
fieldGetters,
RowKind.UPDATE_BEFORE));
}
fullGenericRows.add(
convertRecordDataToGenericRow(
dataChangeEvent.after(), fieldGetters, RowKind.UPDATE_AFTER));
break;
}
case DELETE:
{
if (hasPrimaryKey) {
fullGenericRows.add(
convertRecordDataToGenericRow(
dataChangeEvent.before(), fieldGetters, RowKind.DELETE));
}
break;
}
...
}
return fullGenericRows;
}
여기서 주목할 점은 hasPrimaryKey 조건입니다.
PK가 있는 테이블에서는 UPDATE_BEFORE + UPDATE_AFTER 쌍을 생성하고, PK가 없으면 UPDATE_AFTER만 생성합니다. PK 테이블의 경우 UPDATE_BEFORE와 UPDATE_AFTER가 중복 적용되어도, Paimon의 LSM-Tree merge 과정에서 같은 키의 레코드들은 "가장 최신 값"으로 merge되기 때문에 Compaction이 완료되면 PK당 하나의 최종 행만 남으므로 결과적으로 멱등합니다.
PK가 없는 테이블의 경우 어떤 행을 삭제할지 식별할 키가 없기 때문에 보통 append-only로 쓰입니다. 따라서 UPDATE_AFTER만 생성하게 됩니다.
4-3. Fluss: PK 유무에 따른 자동 분기
Fluss 싱크 커넥터는 테이블에 Primary Key가 있는지 없는지에 따라 완전히 다른 Writer를 생성합니다.
// FlussSinkWriter.java (Lines 115-124)
// flink-cdc-pipeline-connector-fluss/.../v2/FlussSinkWriter.java
Table table = connection.getTable(tablePath);
TableWriter writer;
if (table.getTableInfo().hasPrimaryKey()) {
writer = table.newUpsert().createWriter();
} else {
writer = table.newAppend().createWriter();
}
tableMap.put(tablePath, table);
writerMap.put(tablePath, writer);
PK가 있으면 UpsertWriter를, 없으면 AppendWriter를 사용합니다.
당연하게도 실제 쓰기 시에 Writer 타입에 따라 연산이 분기됩니다.
// FlussSinkWriter.java (Lines 160-191)
private CompletableFuture<?> write(
TableWriter writer, FlussOperationType opType, InternalRow row, TablePath tablePath)
throws IOException {
if (writer instanceof UpsertWriter) {
UpsertWriter upsertWriter = (UpsertWriter) writer;
if (opType == FlussOperationType.UPSERT) {
return upsertWriter.upsert(row);
} else if (opType == FlussOperationType.DELETE) {
return upsertWriter.delete(row);
} else {
throw new UnsupportedOperationException(
String.format(
"Unsupported operation type: %s for primary key table %s",
opType, tablePath));
}
} else if (writer instanceof AppendWriter) {
AppendWriter appendWriter = (AppendWriter) writer;
if (opType == FlussOperationType.APPEND) {
return appendWriter.append(row);
} else {
throw new UnsupportedOperationException(
String.format(
"Unsupported operation type: %s for log table %s",
opType, tablePath));
}
} else {
throw new UnsupportedOperationException(...);
}
}
UpsertWriter는 upsert와 delete 두 가지 연산만 지원한다. 위에서 언급했듯이 upsert는 멱등하고 delete도 멱등하므로, 중복 적용에 안전합니다. 반면 AppendWriter는 append 연산만 지원합니다. PK가 없는 append-only 테이블에서는 upsert가 불가능하고, 중복 이벤트가 도착하면 같은 행이 두 번 삽입됩니다.
Paimon에서도 그렇듯, PK가 없는 경우엔 upsert 기반 멱등성의 한계를 보여주게 됩니다.
5. Upsert가 재처리를 할 수 있게 하는 3가지 조건
지금까지 Flink CDC에서 여러 싱크 커넥터가 재처리에 대한 중복 적용 위험에 대해 UPSERT 시멘틱을 통해 해결하는 것을 살펴보았습니다. 마지막으로 UPSERT가 재처리를 가능하게끔 하는 필수 요건에 대해 살펴보고 마무리합니다.
- PK가 반드시 필요합니다.
- DELETE 연산 시, 없는 레코드를 삭제해도 에러가 발생하면 안 됩니다.
- 같은 키에 여러 번 업데이트가 발생했을 때, 가장 마지막에 쓰인 값이 최종 값이 되어야 합니다.
글을 작성하며 여러 공식 문서를 살펴보았는데요, 가장 중요한 전제 조건은 Primary Key의 존재입니다.
PK가 없는 테이블에서는 upsert가 불가능하기 때문인데요, 이 밖에도 초기 스냅샷 시 PK가 없다면 정합성이 어긋날 위험도 있는 등 CDC 연동 과정에서 반드시 필요한 필드입니다. 만약 CDC 파이프라인 연동 과정에서 PK가 존재하지 않는 테이블이라면.. 다시 생각해 볼 필요가 있겠습니다. 😅