https://github.com/apache/flink-cdc/pull/4249
[FLINK-38985][docs] Add documentation for VARIANT type and PARSE_JSON functions by suhwan-cheon · Pull Request #4249 · apache/
Summary Add documentation for VARIANT type support and PARSE_JSON/TRY_PARSE_JSON functions introduced in recent PRs. (in https://issues.apache.org/jira/browse/FLINK-38874 issue - sub tasks) Notes ...
github.com
Flink 2.1.0에서 반정형 데이터를 지원하는 VARIANT 타입과 이를 파싱하기 위한 PARSE_JSON가 도입되었습니다.
Flink cdc 라이브러리에서도 YAML 형태의 파이프라인에서 이에 대응하기 위해 기능을 추가했고, 이에 대한 Docs 작업 중 알게된 것들을 정리합니다.
VARIANT 타입
https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/table/types/#variant
Data Types
Data Types # Flink SQL has a rich set of native data types available to users. Data Type # A data type describes the logical type of a value in the table ecosystem. It can be used to declare input and/or output types of operations. Flink’s data types are
nightlies.apache.org
VARIANT는 반정형 데이터(semi-structured data)를 위한 타입입니다
반정형 데이터란 정해진 규격이 없는 데이터로 JSON, MAP과 같이 유연하게 사용할 수 있는 타입을 의미합니다.
예를 들어 동일한 JSON 타입이래도 어떤 레코드에는 있는 필드가 다른 레코드에는 없을 수도 있습니다.
{
"이름": "홍길동",
"직업": "개발자",
"기술": ["Python", "SQL", "Cloud"]
}
해당 타입의 장점은 테이블 스키마의 변경 없이도 새로운 필드를 추가할 수 있다는 점입니다.
일반적으로 NOSQL에 저장되는 데이터가 그러하며, MySQL과 같은 RDBMS에서도 이런 필드를 지원합니다.
PARSE_JSON 함수
System (Built-in) Functions
System (Built-in) Functions # Flink Table API & SQL provides users with a set of built-in functions for data transformations. This page gives a brief overview of them. If a function that you need is not supported yet, you can implement a user-defined funct
nightlies.apache.org
JSON 문자열을 VARIANT 타입으로 파싱하는 기능을 합니다.
Flink에서는 두 번째 인자로 allow_duplicate_keys 옵션을 제공하는데요, 이는 중복 키에 대해 어떻게 처리할지 여부를 지정합니다.
만약 allow_duplicate_keys 값이 true라면 키의 중복을 허용하고, 나중에 나온 Value를 최종값으로 정합니다.
예를 들어 아래와 같은 JSON 값이 있을 때
{
"name": "suhwan",
"age": 20,
"age": 25 <-- 나중에 나온 25가 채택
}
처음 나온 20은 무시되며, 25가 age의 Value로 채택되는 것이죠.
보통 이런 옵션은 예측이 불가능하기 때문에 잘 사용하지 않을 것 같고.. 기본값도 false입니다. 중복시 에러를 뱉어내게 됩니다.
추가로 TRY_PARSE_JSON 함수가 있는데요, 이 함수는 PARSE_JSON과 거의 비슷하나 JSON 형식에 맞지 않는 경우 NULL을 반환하도록 합니다. JSON 파싱 실패 시 전체 작업이 실패하는 것을 방지하거나, COALESCE(TRY_PARSE_JSON(col), default_value) 같은 패턴으로 fallback 처리하고 싶은 경우 사용할 수 있습니다.
Flink CDC에서 이 타입을 지원한 이유
만약 Flink CDC가 지원하는 Source, Sink DB에 VARIANT 타입을 지원하는 곳이 없다면, 이 기능은 의미가 없습니다.
제가 사용하는 MySQL, Iceberg에는 이러한 타입이 없어서 찾아본 결과, Apache Paimon에서 이런 타입을 제공해주고 있었습니다.
따라서 Paimon의 Sink connector 부분에 관련 PR이 반영되었고, 이를 지원하기 위해 VARIANT 타입 관련 코드가 정말 많이 추가되었습니다. 대부분 Flink 코어 레포의 변경 사항을 그대로 가져온거긴 하지만 CDC 라이브러리 운영도 참 쉽지 않다는 생각을 했네요..
TO DO
작업 중 Flink SQL은 Calcite -> Janino 형태로 해석 및 처리되는 것을 알았습니다.
Calcite는 사용자가 입력한 SQL 문장을 컴퓨터가 이해할 수 있는 트리 구조로 변환 및 최적화하는 기능이며
Janino는 Calcite가 세운 계획을 실제 실행 가능한 Java 코드로 변환한 뒤 실시간으로 컴파일해서 실행하는 역할을 한다고 합니다.
-> 추후 Flink SQL 내부 코드를 보며 공부해보기
'오픈소스' 카테고리의 다른 글
| [flink-cdc] Iceberg sink connector에서의 default value 지원 (0) | 2026.02.14 |
|---|---|
| [flink-cdc] MySQL 커넥터의 BIGINT UNSIGNED 무한 Chunk Splitting 버그 (0) | 2026.01.18 |