https://github.com/apache/flink-cdc/pull/4277
[FLINK-39055] [Iceberg] Support default column values in Iceberg sink connector by suhwan-cheon · Pull Request #4277 · apache/
Summary In the Iceberg table version 3, default value support for columns https://iceberg.apache.org/spec/#version-3-extended-types-and-capabilities Add default column value support for Iceberg...
github.com
Iceberg 1.8.0부터 소개된 table format 3에는 칼럼에 대해서 default value를 지원하는 기능이 포함되어 있습니다. 이에 대응하여 flink-cdc에서도 default value를 지원할 수 있도록 수정한 내용을 소개합니다.
배경
왜 Default Values가 필요한가?
실시간 CDC(Change Data Capture) 파이프라인을 운영하다 보면, 소스 데이터베이스에서 스키마 변경이 발생하는 것은 피할 수 없습니다. 특히 ALTER TABLE ADD COLUMN... DEFAULT 'value'처럼 기본값이 있는 컬럼을 추가하는 경우, 싱크 테이블에서도 이 기본값이 올바르게 반영되어야 합니다.
Flink CDC의 Iceberg 싱크 커넥터는 스키마 변경(컬럼 추가, 삭제, 타입 변경 등)은 지원했지만, 칼럼의 기본값(default value)은 무시하고 있었습니다. 이로 인해 다음과 같은 문제가 발생할 수 있습니다.

- 소스 DB에서 DEFAULT 'active'로 정의된 칼럼이 Iceberg에는 기본값 없이 생성됨 (스키마가 동일하지 않음)
- 새로 추가된 컬럼에 값이 없으면 null로 처리되어 소스와 싱크 간 데이터 불일치 발생
이 문제를 Iceberg에서 제공하는 API를 이용해 CDC 적재 시 메타데이터를 변경하여 해결해 보았습니다.
해결 방법
1. 기본값 파싱 및 변환
Flink CDC에서는 default value를 파싱 하는 함수는 기본적으로 지원하는데요, 이를 Iceberg가 이해할 수 있는 타입별 Literal <?> 객체로 변환하는 로직이 추가로 필요했습니다.
@Nullable
public static Literal<?> parseDefaultValue(
@Nullable String defaultValueExpression, DataType cdcType) {
if (defaultValueExpression == null) {
return null;
}
try {
switch (cdcType.getTypeRoot()) {
case CHAR:
case VARCHAR:
return Literal.of(defaultValueExpression);
case BOOLEAN:
if ("true".equalsIgnoreCase(defaultValueExpression)) {
return Literal.of(true);
} else if ("false".equalsIgnoreCase(defaultValueExpression)) {
return Literal.of(false);
} else {
LOG.warn(
"Invalid boolean default value '{}', skipping default value.",
defaultValueExpression);
return null;
}
case TINYINT:
case SMALLINT:
case INTEGER:
return Literal.of(Integer.parseInt(defaultValueExpression));
case BIGINT:
return Literal.of(Long.parseLong(defaultValueExpression));
case FLOAT:
return Literal.of(Float.parseFloat(defaultValueExpression));
case DOUBLE:
return Literal.of(Double.parseDouble(defaultValueExpression));
case DECIMAL:
int scale = DataTypes.getScale(cdcType).orElse(0);
return Literal.of(
new java.math.BigDecimal(defaultValueExpression)
.setScale(scale, java.math.RoundingMode.HALF_UP));
default:
LOG.warn(
"Unsupported default value type {} for expression '{}', skipping default value.",
cdcType.getTypeRoot(),
defaultValueExpression);
return null;
}
} catch (NumberFormatException e) {
LOG.warn(
"Failed to parse default value '{}' for type {}, skipping default value.",
defaultValueExpression,
cdcType.getTypeRoot(),
e);
return null;
}
}
지원하는 타입은 Flink CDC의 파싱 로직에 의존합니다.
여기서는 uuid(), CURRENT_TIMESTAMP()와 같은 불명확한 default value 값의 경우 처리하지 않는 것을 원칙으로 합니다.
2. 스키마 변경 시 default values 적용
default values가 적용되는 시나리오는 두 가지입니다.
#1. CREATE TABLE (테이블 최초 생성 시)
소스 DB 테이블을 Iceberg에 최초 생성할 때, 칼럼에 정의된 기본값을 함께 설정합니다.
// 테이블 생성 후 기본값 적용
Table table = catalog.createTable(tableIdentifier, icebergSchema, partitionSpec,
tableOptions);
applyDefaultValues(table, cdcSchema);
// applyDefaultValues는 Iceberg의 updateColumnDefault API를 사용합니다
private void applyDefaultValues(Table table, Schema cdcSchema) {
UpdateSchema updateSchema = null;
for (Column column : cdcSchema.getColumns()) {
Literal<?> defaultValue = IcebergTypeUtils.parseDefaultValue(
column.getDefaultValueExpression(), column.getType());
if (defaultValue != null) {
if (updateSchema == null) {
updateSchema = table.updateSchema();
}
updateSchema.updateColumnDefault(column.getName(), defaultValue);
}
}
if (updateSchema != null) {
updateSchema.commit();
}
}
코드 주석에도 명시했듯이 Iceberg의 updateColumnDefault API를 사용해 칼럼명과 기본값을 넘겨주면 됩니다.
이 방식은 Iceberg 메타데이터에 write-default를 설정합니다. 테이블이 방금 생성되어 기존 데이터가 없으므로, 이후 쓰기 시 기본값이 적용됩니다. write-default에 대해서는 밑에서 좀 더 자세히 설명하겠습니다.
#2. ADD COLUMN (칼럼을 추가할 때)
운영 중인 테이블에 default values를 가진 새 칼럼이 추가되는 상황입니다.
Literal<?> defaultValue = IcebergTypeUtils.parseDefaultValue(
addColumn.getDefaultValueExpression(), addColumn.getType());
if (defaultValue != null) {
updateSchema.addColumn(columnName, icebergType, columnComment,
defaultValue);
} else {
updateSchema.addColumn(columnName, icebergType, columnComment);
}
이 경우 좀 더 단순하게 Iceberg의 addColumn API에 기본값을 같이 넣어 전달합니다.
이 방식은 Iceberg 메타데이터에 initial-default, write-default 모두를 설정합니다.
initial-default, write-default 설명
- write-default: 이후 새로운 행을 쓸 때, 해당 칼럼에 값이 없으면 이 기본값 사용
- initial-default: 컬럼 추가 이전에 이미 기록된 데이터 파일을 읽을 때, 해당 칼럼의 기본값으로 사용 (Iceberg format v3부터 사용 가능)
Iceberg가 사용하는 메타데이터 구조의 최적화된 성질로 인해 이런 값을 추가로 넣는 것으로 보입니다.
굳이 데이터 파일에 default 값을 전부 넣을 필요 없이 조회 시점에 기본 값을 채워 넣는 것이죠.
주의 사항
initial-default는 Iceberg format v3에서 지원하는데요, 이게 쿼리 엔진(Spark, trino..)에서 이 기능을 지원하는지 확인이 필요합니다.
결국 이 키 값은 쿼리 엔진의 영향을 크게 받기 때문입니다.
- Spark의 경우 Iceberg 1.8.0+에서 parquet reader로 initial default 값에 대한 처리를 지원해 주는 것으로 확인됩니다.
'오픈소스' 카테고리의 다른 글
| [flink-cdc] VARIANT 타입과 PARSE_JSON 함수 (0) | 2026.01.31 |
|---|---|
| [flink-cdc] MySQL 커넥터의 BIGINT UNSIGNED 무한 Chunk Splitting 버그 (0) | 2026.01.18 |