Log Compaction

Log Compaction은 각 키의 최신 값만 유지하여 무한 Retention을 효율적으로 지원합니다.

Compaction 개요

기본 개념

┌─────────────────────────────────────────────────────────────────┐
│                      Log Compaction                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Before Compaction:                                             │
│  ┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐             │
│  │K1:A │K2:B │K1:C │K3:D │K2:E │K1:F │K3:G │K4:H │             │
│  └─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┘             │
│                                                                 │
│  After Compaction:                                              │
│  ┌─────┬─────┬─────┬─────┐                                     │
│  │K1:F │K2:E │K3:G │K4:H │                                     │
│  └─────┴─────┴─────┴─────┘                                     │
│     ↑     ↑     ↑     ↑                                        │
│   최신값만 유지                                                  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Delete vs Compact 비교

특성DeleteCompact
삭제 기준시간/크기키 중복
데이터 보존기간 제한최신 값 영구 보존
사용 사례이벤트 로그상태/설정
메시지 키선택적필수

Compaction 설정

Topic 레벨

# Compaction 활성화
kafka-topics.sh --create \
    --topic compacted-topic \
    --bootstrap-server localhost:9092 \
    --config cleanup.policy=compact
 
# 또는 기존 토픽 수정
kafka-configs.sh --alter \
    --entity-type topics \
    --entity-name my-topic \
    --bootstrap-server localhost:9092 \
    --add-config cleanup.policy=compact

Compaction + Delete

# 둘 다 사용: Compact 후 Delete
kafka-topics.sh --create \
    --topic changelog \
    --bootstrap-server localhost:9092 \
    --config cleanup.policy=compact,delete \
    --config retention.ms=604800000  # 7일

Broker 설정

# Log Cleaner 스레드 수
log.cleaner.threads = 1
 
# Cleaner 버퍼 크기
log.cleaner.dedupe.buffer.size = 134217728  # 128MB
 
# 최소 Compaction 비율 (기본 50%)
log.cleaner.min.cleanable.ratio = 0.5
 
# Cleaner 활성화
log.cleaner.enable = true

Compaction 동작 원리

Head와 Tail

┌─────────────────────────────────────────────────────────────────┐
│                    Log Structure                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌────────────────────────┬────────────────────────────────┐   │
│  │         Tail           │             Head               │   │
│  │    (Compacted)         │       (Not Compacted)          │   │
│  │                        │                                │   │
│  │  ┌───┬───┬───┬───┐    │   ┌───┬───┬───┬───┬───┬───┐   │   │
│  │  │K1 │K2 │K3 │K4 │    │   │K1 │K2 │K1 │K5 │K3 │K6 │   │   │
│  │  │(F)│(E)│(G)│(H)│    │   │(X)│(Y)│(Z)│(A)│(B)│(C)│   │   │
│  │  └───┴───┴───┴───┘    │   └───┴───┴───┴───┴───┴───┘   │   │
│  │                        │                                │   │
│  └────────────────────────┴────────────────────────────────┘   │
│           ↑                            ↑                        │
│     Active Segment 제외          Active Segment                 │
│     모든 세그먼트 대상                (Compaction 제외)           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Compaction 과정

1. Log Cleaner가 dirty 세그먼트 식별
     ↓
2. 각 키의 오프셋 맵 생성
     ↓
3. 키별 최신 오프셋만 유지하도록 결정
     ↓
4. 새 세그먼트 파일 생성 (cleaned)
     ↓
5. 원본 세그먼트 교체

세그먼트 상태:
[original] → [cleaning] → [cleaned] → [swap]

Dirty Ratio

// Compaction 시작 조건
dirty_ratio = (head_bytes) / (total_bytes)
 
// 기본값: 50% 이상이면 Compaction 시작
log.cleaner.min.cleanable.ratio = 0.5

키 삭제 (Tombstone)

Tombstone 메시지

// 키 삭제: value를 null로 설정
producer.send(new ProducerRecord<>("topic", "key-to-delete", null));

Tombstone 동작

Before:
┌─────┬─────┬─────┐
│K1:A │K2:B │K1:C │
└─────┴─────┴─────┘

Send Tombstone (K1: null):
┌─────┬─────┬─────┬──────────┐
│K1:A │K2:B │K1:C │K1: null  │
└─────┴─────┴─────┴──────────┘

After Compaction:
┌─────┬──────────┐
│K2:B │K1: null  │  ← Tombstone 유지
└─────┴──────────┘

After delete.retention.ms:
┌─────┐
│K2:B │  ← Tombstone 삭제됨
└─────┘

Tombstone 보관 기간

# Tombstone 보관 기간 (기본 24시간)
log.cleaner.delete.retention.ms = 86400000

Compaction 관련 설정

주요 설정

설정기본값설명
log.cleaner.enabletrueCompaction 활성화
log.cleaner.threads1Cleaner 스레드 수
log.cleaner.min.cleanable.ratio0.5최소 dirty 비율
log.cleaner.delete.retention.ms24시간Tombstone 보관 기간
min.compaction.lag.ms0최소 Compaction 지연
max.compaction.lag.ms무한최대 Compaction 지연

세그먼트 설정

# 세그먼트 크기 (Compaction 단위)
log.segment.bytes = 1073741824  # 1GB
 
# 작은 세그먼트 = 더 자주 Compaction
# 큰 세그먼트 = 적은 Compaction, 더 많은 공간 필요

Compaction 지연

# 최소 지연: 메시지가 이 기간 동안 Compaction에서 제외
min.compaction.lag.ms = 0  # 기본값
 
# 예: 1시간 이내 메시지는 Compaction 제외
min.compaction.lag.ms = 3600000
 
# 최대 지연: 이 기간이 지나면 강제 Compaction
max.compaction.lag.ms = 9223372036854775807  # 기본값 (무한)

사용 사례

1. 데이터베이스 CDC

MySQL → Debezium → Kafka (Compacted) → Consumer

테이블 변경:
INSERT user (id=1, name="Alice") → K=1, V={name:"Alice"}
UPDATE user SET name="Alicia" WHERE id=1 → K=1, V={name:"Alicia"}
DELETE FROM user WHERE id=1 → K=1, V=null

Compaction 후:
K=1: {name:"Alicia"} 또는 삭제됨 (Tombstone)

2. 설정 저장소

// 애플리케이션 설정 저장
producer.send(new ProducerRecord<>("app-config", "db.url", "jdbc:mysql://..."));
producer.send(new ProducerRecord<>("app-config", "cache.ttl", "3600"));
 
// 설정 변경
producer.send(new ProducerRecord<>("app-config", "cache.ttl", "7200"));
 
// Consumer는 항상 최신 설정 조회 가능
// Compaction으로 불필요한 이전 값 제거

3. Kafka Streams State Store

KTable<String, Long> wordCounts = ...;

내부적으로 Compacted Topic 사용:
- changelog topic
- 각 키의 최신 상태만 유지
- 장애 복구 시 상태 복원

4. 사용자 프로필

// 사용자 프로필 업데이트
producer.send(new ProducerRecord<>("user-profiles", "user-123", profileJson));
 
// 변경 이력보다 현재 상태가 중요
// Compaction으로 최신 프로필만 유지

모니터링

JMX 메트릭

kafka.log:type=LogCleaner,name=cleaner-recopy-percent
→ 재복사된 데이터 비율 (낮을수록 효율적)

kafka.log:type=LogCleaner,name=max-clean-time-secs
→ 최대 Cleaning 시간

kafka.log:type=LogCleaner,name=max-buffer-utilization-percent
→ Cleaner 버퍼 사용률

명령행 확인

# Compaction 상태 확인
kafka-log-dirs.sh --describe \
    --bootstrap-server localhost:9092 \
    --topic-list compacted-topic
 
# 로그 세그먼트 확인
ls -la /kafka-logs/compacted-topic-0/
 
# Cleaner 로그 확인
grep "Log cleaner" /var/log/kafka/server.log

주의사항

키 필수

// ❌ 잘못된 사용: 키 없음
producer.send(new ProducerRecord<>("compacted-topic", null, value));
// Compaction 불가, 영구 보관됨
 
// ✓ 올바른 사용: 키 포함
producer.send(new ProducerRecord<>("compacted-topic", key, value));

메모리 요구사항

# Cleaner 버퍼가 부족하면 Compaction 효율 저하
log.cleaner.dedupe.buffer.size = 134217728  # 128MB
 
# 권장: 고유 키 수 × 키 크기 이상
# 예: 1억 키 × 50 bytes = 5GB

순서 보장

Compaction은 키 내에서 순서만 보장:
K1:A → K2:B → K1:C → K2:D

Compaction 후:
K1:C → K2:D  (순서 유지)
또는
K2:D → K1:C  (키 간 순서 변경 가능)

관련 문서