Schema 관리
Kafka에서 메시지 스키마를 효과적으로 관리하기 위한 방법과 Schema Registry 활용법을 살펴봅니다.
Schema 관리의 필요성
문제 상황
┌─────────────────────────────────────────────────────────────────┐
│ Schema Evolution Problem │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Producer v1 Consumer v1 │
│ { "name": "Alice", 기대: name, email │
│ "email": "alice@..." } ✓ 정상 동작 │
│ │
│ Producer v2 (필드 추가) Consumer v1 (업데이트 안됨) │
│ { "name": "Alice", 기대: name, email │
│ "email": "alice@...", phone 필드 무시? 에러? │
│ "phone": "123-456" } ❓ 불명확 │
│ │
│ Producer v3 (필드 제거) Consumer v2 │
│ { "name": "Alice", 기대: name, email, phone │
│ "email": "alice@..." } phone 없음 │
│ ❌ 에러 가능 │
│ │
└─────────────────────────────────────────────────────────────────┘
Schema Registry 역할
┌─────────────────────────────────────────────────────────────────┐
│ Schema Registry Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────────┐ ┌─────────┐ │
│ │ Producer │──────▶ │ Schema Registry │ ◀──────│Consumer │ │
│ └─────────────┘ └─────────────────┘ └─────────┘ │
│ │ │ │ │
│ │ │ │ │
│ │ 1. 스키마 등록/조회 │ │ │
│ │ 2. 호환성 검사 │ 3. 스키마 조회 │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Kafka Cluster │ │
│ │ 메시지 = Schema ID (4 bytes) + 직렬화된 데이터 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Schema Registry 설정
설치 및 실행
# Confluent Schema Registry (Docker)
docker run -d \
--name schema-registry \
-p 8081:8081 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 \
confluentinc/cp-schema-registry:latest
# Health check
curl http://localhost:8081/클라이언트 설정
// Producer 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", KafkaAvroSerializer.class);
props.put("value.serializer", KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
// Consumer 설정
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("key.deserializer", KafkaAvroDeserializer.class);
consumerProps.put("value.deserializer", KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "http://localhost:8081");
consumerProps.put("specific.avro.reader", "true");Avro Schema 정의
기본 스키마
{
"type": "record",
"name": "User",
"namespace": "com.example.events",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"}
]
}복잡한 스키마
{
"type": "record",
"name": "Order",
"namespace": "com.example.events",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{
"name": "status",
"type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
}
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "productId", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}
},
{
"name": "shippingAddress",
"type": ["null", {
"type": "record",
"name": "Address",
"fields": [
{"name": "street", "type": "string"},
{"name": "city", "type": "string"},
{"name": "country", "type": "string"}
]
}],
"default": null
},
{"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"}
]
}Schema 호환성
호환성 유형
┌─────────────────────────────────────────────────────────────────┐
│ Compatibility Types │
├─────────────────────────────────────────────────────────────────┤
│ │
│ BACKWARD (기본값) │
│ - 새 스키마로 이전 데이터 읽기 가능 │
│ - Consumer 먼저 업그레이드 │
│ - 필드 추가: 기본값 필수 │
│ - 필드 제거: 가능 │
│ │
│ FORWARD │
│ - 이전 스키마로 새 데이터 읽기 가능 │
│ - Producer 먼저 업그레이드 │
│ - 필드 추가: 가능 │
│ - 필드 제거: 기본값 필수 │
│ │
│ FULL │
│ - BACKWARD + FORWARD │
│ - 순서 상관없이 업그레이드 가능 │
│ - 필드 추가/제거: 둘 다 기본값 필수 │
│ │
│ NONE │
│ - 호환성 검사 없음 │
│ - 주의 필요 │
│ │
└─────────────────────────────────────────────────────────────────┘
호환성 설정
# 전역 호환성 설정
curl -X PUT http://localhost:8081/config \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
# Subject별 호환성 설정
curl -X PUT http://localhost:8081/config/orders-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL"}'Schema Evolution
BACKWARD 호환 변경
// v1: 원본 스키마
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}
// v2: 필드 추가 (기본값 포함)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "phone", "type": ["null", "string"], "default": null}
]
}
// v3: 필드 제거
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
// email 제거됨 - BACKWARD 호환
]
}FORWARD 호환 변경
// v1: 원본 스키마
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}
// v2: 필드 제거 (기본값 추가)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
// email 제거 - 이전 Reader가 기본값 사용
]
}비호환 변경 (피해야 함)
❌ 필드 타입 변경: string → int
❌ 필드명 변경: email → emailAddress
❌ 필수 필드 추가 (기본값 없이)
❌ enum 값 제거
Schema Registry API
스키마 등록
# 스키마 등록
curl -X POST http://localhost:8081/subjects/users-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"}]}"
}'
# 응답: {"id": 1}스키마 조회
# 모든 subjects
curl http://localhost:8081/subjects
# 특정 subject의 버전들
curl http://localhost:8081/subjects/users-value/versions
# 특정 버전의 스키마
curl http://localhost:8081/subjects/users-value/versions/1
# 최신 스키마
curl http://localhost:8081/subjects/users-value/versions/latest
# ID로 스키마 조회
curl http://localhost:8081/schemas/ids/1호환성 검사
# 호환성 테스트
curl -X POST http://localhost:8081/compatibility/subjects/users-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
}'
# 응답: {"is_compatible": true}Java 코드 예제
Producer
// Avro 레코드 정의 (자동 생성 또는 GenericRecord)
@AvroGenerated
public class User extends SpecificRecordBase {
private long id;
private String name;
private String email;
// getter, setter...
}
// Producer
public class AvroProducer {
public void sendUser(User user) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
try (KafkaProducer<String, User> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, User> record = new ProducerRecord<>(
"users",
String.valueOf(user.getId()),
user
);
producer.send(record).get();
}
}
}Consumer
public class AvroConsumer {
public void consumeUsers() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user-processor");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", KafkaAvroDeserializer.class);
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", "true");
try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("users"));
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, User> record : records) {
User user = record.value();
System.out.printf("User: id=%d, name=%s%n",
user.getId(), user.getName());
}
}
}
}
}GenericRecord 사용
// Schema를 모르는 경우 (동적 처리)
props.put("specific.avro.reader", "false");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord value = record.value();
Object id = value.get("id");
Object name = value.get("name");
System.out.printf("Record: id=%s, name=%s%n", id, name);
}다른 직렬화 포맷
JSON Schema
// JSON Schema Serializer
props.put("value.serializer", KafkaJsonSchemaSerializer.class);
props.put("value.deserializer", KafkaJsonSchemaDeserializer.class);
// POJO 직접 사용 가능
public class UserJson {
private long id;
private String name;
private String email;
}Protobuf
// user.proto
syntax = "proto3";
package com.example.events;
message User {
int64 id = 1;
string name = 2;
string email = 3;
}// Protobuf Serializer
props.put("value.serializer", KafkaProtobufSerializer.class);
props.put("value.deserializer", KafkaProtobufDeserializer.class);포맷 비교
| 특성 | Avro | JSON Schema | Protobuf |
|---|---|---|---|
| 크기 | 작음 | 큼 | 매우 작음 |
| 속도 | 빠름 | 느림 | 매우 빠름 |
| 스키마 진화 | 우수 | 보통 | 우수 |
| 가독성 | 낮음 (바이너리) | 높음 | 낮음 (바이너리) |
| 생태계 | Kafka 친화적 | 범용 | Google 친화적 |
Best Practices
1. 호환성 정책
✓ 프로덕션: BACKWARD 또는 FULL
✓ 개발: BACKWARD 또는 NONE (주의)
✓ Subject별로 적절한 호환성 설정
2. 스키마 버전 관리
✓ Git에 스키마 파일 관리
✓ CI/CD에서 호환성 검사 자동화
✓ 버전 태깅 및 릴리스 노트
3. 네이밍 컨벤션
Subject 이름: {topic-name}-{key|value}
예: orders-value, users-key
Namespace: com.company.domain.events
Record 이름: 의미 있는 이름 (User, Order, Event)
4. 필드 설계
// 좋은 예: 확장 가능한 설계
{
"name": "metadata",
"type": ["null", {
"type": "map",
"values": "string"
}],
"default": null
}
// 피해야 할 예: 타입 변경 필요한 설계
{"name": "count", "type": "int"} // 나중에 long 필요할 수 있음5. 모니터링
# Schema Registry 상태 모니터링
curl http://localhost:8081/subjects # Subject 수
curl http://localhost:8081/schemas # 스키마 수
# 메트릭
- schema_registry_registered_count
- schema_registry_compatibility_check_failures
댓글 (0)