Producer 주요 설정

Kafka Producer의 성능, 신뢰성, 처리량을 최적화하기 위한 주요 설정들을 다룹니다.

필수 설정

bootstrap.servers

  • 설명: Kafka 클러스터에 연결하기 위한 브로커 목록
  • 타입: List
  • 기본값: 없음 (필수)
  • 예제: "localhost:9092" 또는 "broker1:9092,broker2:9092"
props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");

권장사항:

  • 최소 2개 이상의 브로커 지정 (고가용성)
  • 모든 브로커를 나열할 필요는 없음 (클러스터 정보는 자동 검색)

key.serializer

  • 설명: 메시지 키를 직렬화하는 클래스
  • 타입: Class
  • 기본값: 없음 (필수)
props.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");

자주 사용되는 Serializer:

  • StringSerializer: 문자열
  • IntegerSerializer: 정수
  • LongSerializer: Long 타입
  • ByteArraySerializer: 바이트 배열
  • DoubleSerializer: 실수
  • 커스텀 Serializer 구현 가능

value.serializer

  • 설명: 메시지 값을 직렬화하는 클래스
  • 타입: Class
  • 기본값: 없음 (필수)
props.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");

신뢰성 관련 설정

acks

  • 설명: Producer가 메시지 전송 성공으로 간주하기 위해 필요한 브로커 응답 수
  • 타입: String
  • 기본값: "all" (Kafka 3.0+), "1" (이전 버전)
  • 가능한 값: "0", "1", "all" (또는 "-1")

acks=0

props.put("acks", "0");
  • 브로커 응답을 기다리지 않음
  • 최고 처리량, 최저 신뢰성
  • 메시지 손실 가능성 높음

사용 사례: 로그, 메트릭 등 손실이 허용되는 데이터

acks=1

props.put("acks", "1");
  • 리더 브로커만 응답하면 성공
  • 균형잡힌 성능과 신뢰성
  • 리더 장애 시 메시지 손실 가능

사용 사례: 대부분의 일반적인 경우

acks=all (또는 -1)

props.put("acks", "all");
  • 리더와 모든 ISR 레플리카가 응답해야 성공
  • 최고 신뢰성, 낮은 처리량
  • min.insync.replicas와 함께 사용

사용 사례: 금융 거래, 주문 등 손실이 절대 불가한 데이터

// 최고 신뢰성 설정
props.put("acks", "all");
props.put("min.insync.replicas", "2");  // 브로커 설정

retries

  • 설명: 전송 실패 시 재시도 횟수
  • 타입: Integer
  • 기본값: Integer.MAX_VALUE (Kafka 2.1+)
  • 범위: 0 ~ Integer.MAX_VALUE
props.put("retries", 3);  // 최대 3번 재시도

권장사항:

  • 일시적 오류를 처리하기 위해 높은 값 설정
  • delivery.timeout.ms와 함께 사용하여 전체 재시도 시간 제한
// 무한 재시도, 단 2분 이내
props.put("retries", Integer.MAX_VALUE);
props.put("delivery.timeout.ms", 120000);  // 2분

retry.backoff.ms

  • 설명: 재시도 간 대기 시간
  • 타입: Long
  • 기본값: 100ms
  • 범위: 0 ~ Long.MAX_VALUE
props.put("retry.backoff.ms", 100);  // 100ms 대기 후 재시도

max.in.flight.requests.per.connection

  • 설명: 응답을 받기 전에 전송할 수 있는 최대 요청 수
  • 타입: Integer
  • 기본값: 5
  • 범위: 1 ~ Integer.MAX_VALUE
props.put("max.in.flight.requests.per.connection", 5);

순서 보장:

  • 1로 설정하면 순서 보장 (성능 저하)
  • enable.idempotence=true이면 5까지 가능하면서 순서 보장
// 순서 보장이 중요한 경우
props.put("max.in.flight.requests.per.connection", 1);
 
// 또는 멱등성 사용 (권장)
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5);  // 성능 유지

enable.idempotence

  • 설명: 멱등성 Producer 활성화
  • 타입: Boolean
  • 기본값: true (Kafka 3.0+)
props.put("enable.idempotence", true);

효과:

  • 중복 메시지 방지
  • 순서 보장 (max.in.flight.requests.per.connection 5)
  • 자동으로 acks=all, retries=MAX, max.in.flight.requests<=5 설정

관련 문서: Idempotent Producer

delivery.timeout.ms

  • 설명: 메시지 전송의 최대 대기 시간
  • 타입: Integer
  • 기본값: 120000ms (2분)
  • 범위: 0 ~ Integer.MAX_VALUE
props.put("delivery.timeout.ms", 120000);  // 2분

관계:

delivery.timeout.ms >= linger.ms + request.timeout.ms

request.timeout.ms

  • 설명: 개별 요청의 최대 대기 시간
  • 타입: Integer
  • 기본값: 30000ms (30초)
props.put("request.timeout.ms", 30000);

처리량 관련 설정

batch.size

  • 설명: 배치당 최대 바이트 크기
  • 타입: Integer
  • 기본값: 16384 (16KB)
  • 범위: 0 ~ Integer.MAX_VALUE
props.put("batch.size", 16384);  // 16KB

튜닝:

  • 증가: 처리량 향상, 메모리 사용 증가
  • 감소: 레이턴시 감소, 처리량 감소
// 고처리량 환경
props.put("batch.size", 32768);  // 32KB
 
// 저레이턴시 환경
props.put("batch.size", 1024);   // 1KB

linger.ms

  • 설명: 배치를 전송하기 전 대기 시간
  • 타입: Long
  • 기본값: 0 (즉시 전송)
  • 범위: 0 ~ Long.MAX_VALUE
props.put("linger.ms", 10);  // 10ms 대기

동작:

  • batch.size에 도달하거나 linger.ms 시간이 지나면 전송
  • 처리량을 위해 약간의 레이턴시 허용
// 처리량 우선
props.put("linger.ms", 100);     // 100ms 대기
props.put("batch.size", 65536);  // 64KB
 
// 레이턴시 우선
props.put("linger.ms", 0);       // 즉시 전송

buffer.memory

  • 설명: Producer가 사용할 수 있는 전체 메모리
  • 타입: Long
  • 기본값: 33554432 (32MB)
  • 범위: 0 ~ Long.MAX_VALUE
props.put("buffer.memory", 33554432);  // 32MB

동작:

  • 버퍼가 가득 차면 max.block.ms 동안 블로킹
  • 초과 시 BufferExhaustedException 발생
// 고처리량 환경
props.put("buffer.memory", 67108864);  // 64MB

max.block.ms

  • 설명: 버퍼가 가득 찼을 때 블로킹 최대 시간
  • 타입: Long
  • 기본값: 60000ms (60초)
props.put("max.block.ms", 60000);

compression.type

  • 설명: 메시지 압축 알고리즘
  • 타입: String
  • 기본값: "none"
  • 가능한 값: "none", "gzip", "snappy", "lz4", "zstd"
props.put("compression.type", "snappy");

압축 알고리즘 비교:

알고리즘압축률속도CPU 사용권장 사용
none-가장 빠름낮음작은 메시지, CPU 제한
gzip가장 높음느림높음저장 공간 중요
snappy중간빠름낮음균형잡힌 선택
lz4낮음매우 빠름낮음높은 처리량
zstd높음중간중간Kafka 2.1+, 균형
// 균형잡힌 설정 (권장)
props.put("compression.type", "snappy");
 
// 압축률 우선
props.put("compression.type", "zstd");
 
// 속도 우선
props.put("compression.type", "lz4");

메시지 크기 관련 설정

max.request.size

  • 설명: 요청의 최대 크기
  • 타입: Integer
  • 기본값: 1048576 (1MB)
props.put("max.request.size", 1048576);  // 1MB

주의:

  • 브로커의 message.max.bytes 설정과 일치해야 함
  • 토픽의 max.message.bytes 설정도 확인 필요
// 큰 메시지를 전송하는 경우
props.put("max.request.size", 10485760);  // 10MB
 
// 브로커 설정도 변경 필요
// message.max.bytes=10485760

메타데이터 관련 설정

metadata.max.age.ms

  • 설명: 메타데이터 강제 갱신 주기
  • 타입: Long
  • 기본값: 300000ms (5분)
props.put("metadata.max.age.ms", 300000);

보안 관련 설정

security.protocol

  • 타입: String
  • 기본값: "PLAINTEXT"
  • 가능한 값: "PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"
// SSL 사용
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "password");

설정 조합 예제

1. 고신뢰성 설정

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
// 신뢰성 최우선
props.put("acks", "all");
props.put("enable.idempotence", true);
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);
props.put("delivery.timeout.ms", 120000);
 
// 압축으로 네트워크 최적화
props.put("compression.type", "snappy");

2. 고처리량 설정

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
// 처리량 최우선
props.put("acks", "1");
props.put("batch.size", 65536);  // 64KB
props.put("linger.ms", 100);
props.put("buffer.memory", 67108864);  // 64MB
props.put("compression.type", "lz4");
props.put("max.in.flight.requests.per.connection", 5);

3. 저레이턴시 설정

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
// 레이턴시 최우선
props.put("acks", "1");
props.put("batch.size", 1024);   // 1KB
props.put("linger.ms", 0);       // 즉시 전송
props.put("compression.type", "none");

4. 순서 보장 설정

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
// 순서 보장 (멱등성 사용)
props.put("enable.idempotence", true);
props.put("acks", "all");
props.put("max.in.flight.requests.per.connection", 5);
 
// 또는 전통적 방식
// props.put("max.in.flight.requests.per.connection", 1);

설정 선택 가이드

신뢰성 vs 성능 트레이드오프

        신뢰성 ↑
            ↑
    acks=all, idempotence=true
            |
         acks=1
            |
         acks=0
            ↓
        성능 ↑

사용 사례별 권장 설정

금융 거래, 주문 처리

props.put("acks", "all");
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5);

로그, 메트릭 수집

props.put("acks", "1");
props.put("batch.size", 32768);
props.put("linger.ms", 50);
props.put("compression.type", "snappy");

실시간 이벤트 스트리밍

props.put("acks", "1");
props.put("linger.ms", 10);
props.put("compression.type", "lz4");

CDC (Change Data Capture)

props.put("acks", "all");
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 1);  // 순서 보장

모니터링할 메트릭

Producer 설정 튜닝 시 다음 메트릭을 모니터링:

  • record-send-rate: 초당 전송 레코드 수
  • batch-size-avg: 평균 배치 크기
  • compression-rate-avg: 압축률
  • record-queue-time-avg: 큐 대기 시간
  • request-latency-avg: 요청 레이턴시
  • buffer-available-bytes: 사용 가능한 버퍼 메모리

일반적인 실수

1. batch.size와 linger.ms 잘못된 조합

// ✗ 작은 batch.size + 긴 linger.ms
props.put("batch.size", 1024);
props.put("linger.ms", 1000);
// 배치가 빨리 차지 않아 항상 1초 대기
 
// ✓ 적절한 조합
props.put("batch.size", 16384);
props.put("linger.ms", 100);

2. 순서 보장 설정 누락

// ✗ 순서가 중요한데 설정 안 함
props.put("max.in.flight.requests.per.connection", 5);
props.put("enable.idempotence", false);
 
// ✓ 멱등성으로 순서 보장
props.put("enable.idempotence", true);

3. 브로커 설정과 불일치

// ✗ Producer의 max.request.size가 브로커보다 큼
props.put("max.request.size", 10485760);  // 10MB
// 브로커: message.max.bytes=1048576 (1MB)
// 메시지 전송 실패!

관련 문서