Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");// 압축 알고리즘 설정props.put("compression.type", "lz4");// 관련 배치 설정 (압축 효율에 영향)props.put("batch.size", "65536"); // 큰 배치 = 높은 압축률props.put("linger.ms", "10");
압축 타입별 설정
// gzip - 최고 압축률props.put("compression.type", "gzip");// snappy - 빠른 속도props.put("compression.type", "snappy");// lz4 - 권장 (균형)props.put("compression.type", "lz4");// zstd - 최신 권장 (Kafka 2.1+)props.put("compression.type", "zstd");// none - 압축 없음props.put("compression.type", "none");
압축 레벨 (zstd)
// zstd 압축 레벨 (1-22, 기본 3)// 높을수록 압축률 증가, 속도 감소// 빠른 압축props.put("compression.zstd.level", "1");// 기본값props.put("compression.zstd.level", "3");// 최대 압축props.put("compression.zstd.level", "22");
Broker 설정
Topic 레벨 압축
# 토픽 생성 시 압축 설정kafka-topics.sh --create \ --topic compressed-topic \ --bootstrap-server localhost:9092 \ --config compression.type=lz4
Broker 압축 정책
# server.properties# producer 설정 유지 (기본값)compression.type = producer# 강제 압축compression.type = lz4# 압축 없음compression.type = uncompressed
// Consumer는 자동으로 압축 해제// 별도 설정 불필요Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "my-group");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 압축된 메시지도 자동으로 해제되어 반환ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) { // record.value()는 압축 해제된 상태 String value = record.value();}
압축 정보 확인
// Record 헤더에서 압축 정보 확인ConsumerRecord<String, String> record = ...;// 압축 타입은 배치 레벨에서 관리되어 직접 조회 어려움// Kafka 도구로 확인 가능
배치와 압축
배치 압축 원리
┌─────────────────────────────────────────────────────────────────┐
│ Batch Compression │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Record Batch (압축 전): │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Header │ Record 1 │ Record 2 │ Record 3 │ ... │ Record N │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ 압축 │
│ ┌───────────────────────────────────────┐ │
│ │ Header │ Compressed Records │ │
│ │ │ (모든 레코드가 함께 압축됨) │ │
│ └───────────────────────────────────────┘ │
│ │
│ 배치 크기가 클수록 압축 효율 증가 │
│ │
└─────────────────────────────────────────────────────────────────┘
최적 배치 설정
// 압축 효율을 위한 배치 설정Properties props = new Properties();// 큰 배치 = 높은 압축률props.put("batch.size", "131072"); // 128KBprops.put("linger.ms", "20"); // 20ms 대기props.put("compression.type", "lz4");// 작은 배치 = 낮은 압축률 (비권장)// props.put("batch.size", "1024"); // 1KB// props.put("linger.ms", "0");
┌─────────────────────────────────────────────────────────────────┐
│ Trade-off Analysis │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 네트워크 병목 (느린 네트워크): │
│ → 압축률 높은 알고리즘 선택 (gzip, zstd) │
│ → CPU 여유 있으면 최고 압축률 │
│ │
│ CPU 병목 (빠른 네트워크): │
│ → 빠른 알고리즘 선택 (lz4, snappy) │
│ → 또는 압축 비활성화 │
│ │
│ 균형잡힌 환경: │
│ → lz4 또는 zstd 권장 │
│ │
└─────────────────────────────────────────────────────────────────┘
압축 관련 모니터링
Producer 메트릭:
kafka.producer:type=producer-metrics,client-id={client-id}
├── compression-rate-avg # 평균 압축률
├── record-size-avg # 평균 레코드 크기 (압축 전)
└── batch-size-avg # 평균 배치 크기 (압축 후)
계산:
압축률 = 1 - (batch-size-avg / (record-size-avg * records-per-batch))
알고리즘 선택 가이드
선택 플로우:
네트워크 대역폭이 제한적인가?
├── Yes → 압축률 우선
│ ├── Kafka 2.1+ → zstd
│ └── 이전 버전 → gzip
│
└── No → CPU 사용률이 높은가?
├── Yes → 압축 없음 또는 lz4
└── No → lz4 권장 (기본 선택)
압축 문제 해결
압축으로 인한 지연
// 증상: Producer 지연 증가// 원인: gzip 압축의 CPU 오버헤드// 해결 1: 빠른 알고리즘으로 변경props.put("compression.type", "lz4");// 해결 2: 배치 크기 조정props.put("batch.size", "32768"); // 작은 배치
낮은 압축률
// 증상: 기대보다 낮은 압축률// 원인: 작은 배치 또는 이미 압축된 데이터// 해결 1: 배치 크기 증가props.put("batch.size", "131072");props.put("linger.ms", "50");// 해결 2: 데이터 특성 확인// 이미 압축된 데이터는 추가 압축 효과 없음
메모리 문제
// 증상: OutOfMemoryError// 원인: 압축 버퍼 메모리 사용// 해결: 버퍼 크기 조정props.put("buffer.memory", "67108864"); // 64MBprops.put("batch.size", "65536"); // 64KB
댓글 (0)