Schema 관리

Kafka에서 메시지 스키마를 효과적으로 관리하기 위한 방법과 Schema Registry 활용법을 살펴봅니다.

Schema 관리의 필요성

문제 상황

┌─────────────────────────────────────────────────────────────────┐
│                  Schema Evolution Problem                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Producer v1                    Consumer v1                     │
│  { "name": "Alice",             기대: name, email               │
│    "email": "alice@..." }       ✓ 정상 동작                     │
│                                                                 │
│  Producer v2 (필드 추가)         Consumer v1 (업데이트 안됨)      │
│  { "name": "Alice",             기대: name, email               │
│    "email": "alice@...",        phone 필드 무시? 에러?          │
│    "phone": "123-456" }         ❓ 불명확                        │
│                                                                 │
│  Producer v3 (필드 제거)         Consumer v2                     │
│  { "name": "Alice",             기대: name, email, phone        │
│    "email": "alice@..." }       phone 없음                      │
│                                 ❌ 에러 가능                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Schema Registry 역할

┌─────────────────────────────────────────────────────────────────┐
│                    Schema Registry Architecture                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────┐        ┌─────────────────┐        ┌─────────┐ │
│  │  Producer   │──────▶ │ Schema Registry │ ◀──────│Consumer │ │
│  └─────────────┘        └─────────────────┘        └─────────┘ │
│        │                        │                        │      │
│        │                        │                        │      │
│        │  1. 스키마 등록/조회    │                        │      │
│        │  2. 호환성 검사         │  3. 스키마 조회        │      │
│        │                        │                        │      │
│        ▼                        ▼                        ▼      │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                     Kafka Cluster                        │   │
│  │  메시지 = Schema ID (4 bytes) + 직렬화된 데이터           │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Schema Registry 설정

설치 및 실행

# Confluent Schema Registry (Docker)
docker run -d \
  --name schema-registry \
  -p 8081:8081 \
  -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
  -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 \
  confluentinc/cp-schema-registry:latest
 
# Health check
curl http://localhost:8081/

클라이언트 설정

// Producer 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", KafkaAvroSerializer.class);
props.put("value.serializer", KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
 
// Consumer 설정
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("key.deserializer", KafkaAvroDeserializer.class);
consumerProps.put("value.deserializer", KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "http://localhost:8081");
consumerProps.put("specific.avro.reader", "true");

Avro Schema 정의

기본 스키마

{
  "type": "record",
  "name": "User",
  "namespace": "com.example.events",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

복잡한 스키마

{
  "type": "record",
  "name": "Order",
  "namespace": "com.example.events",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
      }
    },
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "productId", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "price", "type": "double"}
          ]
        }
      }
    },
    {
      "name": "shippingAddress",
      "type": ["null", {
        "type": "record",
        "name": "Address",
        "fields": [
          {"name": "street", "type": "string"},
          {"name": "city", "type": "string"},
          {"name": "country", "type": "string"}
        ]
      }],
      "default": null
    },
    {"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

Schema 호환성

호환성 유형

┌─────────────────────────────────────────────────────────────────┐
│                    Compatibility Types                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  BACKWARD (기본값)                                               │
│  - 새 스키마로 이전 데이터 읽기 가능                              │
│  - Consumer 먼저 업그레이드                                      │
│  - 필드 추가: 기본값 필수                                        │
│  - 필드 제거: 가능                                               │
│                                                                 │
│  FORWARD                                                        │
│  - 이전 스키마로 새 데이터 읽기 가능                              │
│  - Producer 먼저 업그레이드                                      │
│  - 필드 추가: 가능                                               │
│  - 필드 제거: 기본값 필수                                        │
│                                                                 │
│  FULL                                                           │
│  - BACKWARD + FORWARD                                           │
│  - 순서 상관없이 업그레이드 가능                                  │
│  - 필드 추가/제거: 둘 다 기본값 필수                              │
│                                                                 │
│  NONE                                                           │
│  - 호환성 검사 없음                                              │
│  - 주의 필요                                                    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

호환성 설정

# 전역 호환성 설정
curl -X PUT http://localhost:8081/config \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'
 
# Subject별 호환성 설정
curl -X PUT http://localhost:8081/config/orders-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "FULL"}'

Schema Evolution

BACKWARD 호환 변경

// v1: 원본 스키마
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}
 
// v2: 필드 추가 (기본값 포함)
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "phone", "type": ["null", "string"], "default": null}
  ]
}
 
// v3: 필드 제거
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"}
    // email 제거됨 - BACKWARD 호환
  ]
}

FORWARD 호환 변경

// v1: 원본 스키마
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}
 
// v2: 필드 제거 (기본값 추가)
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"}
    // email 제거 - 이전 Reader가 기본값 사용
  ]
}

비호환 변경 (피해야 함)

❌ 필드 타입 변경: string → int
❌ 필드명 변경: email → emailAddress
❌ 필수 필드 추가 (기본값 없이)
❌ enum 값 제거

Schema Registry API

스키마 등록

# 스키마 등록
curl -X POST http://localhost:8081/subjects/users-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"}]}"
  }'
 
# 응답: {"id": 1}

스키마 조회

# 모든 subjects
curl http://localhost:8081/subjects
 
# 특정 subject의 버전들
curl http://localhost:8081/subjects/users-value/versions
 
# 특정 버전의 스키마
curl http://localhost:8081/subjects/users-value/versions/1
 
# 최신 스키마
curl http://localhost:8081/subjects/users-value/versions/latest
 
# ID로 스키마 조회
curl http://localhost:8081/schemas/ids/1

호환성 검사

# 호환성 테스트
curl -X POST http://localhost:8081/compatibility/subjects/users-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
  }'
 
# 응답: {"is_compatible": true}

Java 코드 예제

Producer

// Avro 레코드 정의 (자동 생성 또는 GenericRecord)
@AvroGenerated
public class User extends SpecificRecordBase {
    private long id;
    private String name;
    private String email;
    // getter, setter...
}
 
// Producer
public class AvroProducer {
    public void sendUser(User user) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", KafkaAvroSerializer.class);
        props.put("schema.registry.url", "http://localhost:8081");
 
        try (KafkaProducer<String, User> producer = new KafkaProducer<>(props)) {
            ProducerRecord<String, User> record = new ProducerRecord<>(
                "users",
                String.valueOf(user.getId()),
                user
            );
            producer.send(record).get();
        }
    }
}

Consumer

public class AvroConsumer {
    public void consumeUsers() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "user-processor");
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", KafkaAvroDeserializer.class);
        props.put("schema.registry.url", "http://localhost:8081");
        props.put("specific.avro.reader", "true");
 
        try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("users"));
 
            while (true) {
                ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, User> record : records) {
                    User user = record.value();
                    System.out.printf("User: id=%d, name=%s%n",
                        user.getId(), user.getName());
                }
            }
        }
    }
}

GenericRecord 사용

// Schema를 모르는 경우 (동적 처리)
props.put("specific.avro.reader", "false");
 
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
 
for (ConsumerRecord<String, GenericRecord> record : records) {
    GenericRecord value = record.value();
    Object id = value.get("id");
    Object name = value.get("name");
    System.out.printf("Record: id=%s, name=%s%n", id, name);
}

다른 직렬화 포맷

JSON Schema

// JSON Schema Serializer
props.put("value.serializer", KafkaJsonSchemaSerializer.class);
props.put("value.deserializer", KafkaJsonSchemaDeserializer.class);
 
// POJO 직접 사용 가능
public class UserJson {
    private long id;
    private String name;
    private String email;
}

Protobuf

// user.proto
syntax = "proto3";
package com.example.events;
 
message User {
  int64 id = 1;
  string name = 2;
  string email = 3;
}
// Protobuf Serializer
props.put("value.serializer", KafkaProtobufSerializer.class);
props.put("value.deserializer", KafkaProtobufDeserializer.class);

포맷 비교

특성AvroJSON SchemaProtobuf
크기작음매우 작음
속도빠름느림매우 빠름
스키마 진화우수보통우수
가독성낮음 (바이너리)높음낮음 (바이너리)
생태계Kafka 친화적범용Google 친화적

Best Practices

1. 호환성 정책

✓ 프로덕션: BACKWARD 또는 FULL
✓ 개발: BACKWARD 또는 NONE (주의)
✓ Subject별로 적절한 호환성 설정

2. 스키마 버전 관리

✓ Git에 스키마 파일 관리
✓ CI/CD에서 호환성 검사 자동화
✓ 버전 태깅 및 릴리스 노트

3. 네이밍 컨벤션

Subject 이름: {topic-name}-{key|value}
예: orders-value, users-key

Namespace: com.company.domain.events
Record 이름: 의미 있는 이름 (User, Order, Event)

4. 필드 설계

// 좋은 예: 확장 가능한 설계
{
  "name": "metadata",
  "type": ["null", {
    "type": "map",
    "values": "string"
  }],
  "default": null
}
 
// 피해야 할 예: 타입 변경 필요한 설계
{"name": "count", "type": "int"}  // 나중에 long 필요할 수 있음

5. 모니터링

# Schema Registry 상태 모니터링
curl http://localhost:8081/subjects  # Subject 수
curl http://localhost:8081/schemas   # 스키마 수
 
# 메트릭
- schema_registry_registered_count
- schema_registry_compatibility_check_failures

관련 문서