Producer 주요 설정
Kafka Producer의 성능, 신뢰성, 처리량을 최적화하기 위한 주요 설정들을 다룹니다.
필수 설정
bootstrap.servers
- 설명: Kafka 클러스터에 연결하기 위한 브로커 목록
- 타입: List
- 기본값: 없음 (필수)
- 예제:
"localhost:9092"또는"broker1:9092,broker2:9092"
props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");권장사항:
- 최소 2개 이상의 브로커 지정 (고가용성)
- 모든 브로커를 나열할 필요는 없음 (클러스터 정보는 자동 검색)
key.serializer
- 설명: 메시지 키를 직렬화하는 클래스
- 타입: Class
- 기본값: 없음 (필수)
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");자주 사용되는 Serializer:
StringSerializer: 문자열IntegerSerializer: 정수LongSerializer: Long 타입ByteArraySerializer: 바이트 배열DoubleSerializer: 실수- 커스텀 Serializer 구현 가능
value.serializer
- 설명: 메시지 값을 직렬화하는 클래스
- 타입: Class
- 기본값: 없음 (필수)
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");신뢰성 관련 설정
acks
- 설명: Producer가 메시지 전송 성공으로 간주하기 위해 필요한 브로커 응답 수
- 타입: String
- 기본값:
"all"(Kafka 3.0+),"1"(이전 버전) - 가능한 값:
"0","1","all"(또는"-1")
acks=0
props.put("acks", "0");- 브로커 응답을 기다리지 않음
- 최고 처리량, 최저 신뢰성
- 메시지 손실 가능성 높음
사용 사례: 로그, 메트릭 등 손실이 허용되는 데이터
acks=1
props.put("acks", "1");- 리더 브로커만 응답하면 성공
- 균형잡힌 성능과 신뢰성
- 리더 장애 시 메시지 손실 가능
사용 사례: 대부분의 일반적인 경우
acks=all (또는 -1)
props.put("acks", "all");- 리더와 모든 ISR 레플리카가 응답해야 성공
- 최고 신뢰성, 낮은 처리량
min.insync.replicas와 함께 사용
사용 사례: 금융 거래, 주문 등 손실이 절대 불가한 데이터
// 최고 신뢰성 설정
props.put("acks", "all");
props.put("min.insync.replicas", "2"); // 브로커 설정retries
- 설명: 전송 실패 시 재시도 횟수
- 타입: Integer
- 기본값:
Integer.MAX_VALUE(Kafka 2.1+) - 범위: 0 ~ Integer.MAX_VALUE
props.put("retries", 3); // 최대 3번 재시도권장사항:
- 일시적 오류를 처리하기 위해 높은 값 설정
delivery.timeout.ms와 함께 사용하여 전체 재시도 시간 제한
// 무한 재시도, 단 2분 이내
props.put("retries", Integer.MAX_VALUE);
props.put("delivery.timeout.ms", 120000); // 2분retry.backoff.ms
- 설명: 재시도 간 대기 시간
- 타입: Long
- 기본값:
100ms - 범위: 0 ~ Long.MAX_VALUE
props.put("retry.backoff.ms", 100); // 100ms 대기 후 재시도max.in.flight.requests.per.connection
- 설명: 응답을 받기 전에 전송할 수 있는 최대 요청 수
- 타입: Integer
- 기본값:
5 - 범위: 1 ~ Integer.MAX_VALUE
props.put("max.in.flight.requests.per.connection", 5);순서 보장:
1로 설정하면 순서 보장 (성능 저하)enable.idempotence=true이면 5까지 가능하면서 순서 보장
// 순서 보장이 중요한 경우
props.put("max.in.flight.requests.per.connection", 1);
// 또는 멱등성 사용 (권장)
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5); // 성능 유지enable.idempotence
- 설명: 멱등성 Producer 활성화
- 타입: Boolean
- 기본값:
true(Kafka 3.0+)
props.put("enable.idempotence", true);효과:
- 중복 메시지 방지
- 순서 보장 (max.in.flight.requests.per.connection ⇐ 5)
- 자동으로
acks=all,retries=MAX,max.in.flight.requests<=5설정
관련 문서: Idempotent Producer
delivery.timeout.ms
- 설명: 메시지 전송의 최대 대기 시간
- 타입: Integer
- 기본값:
120000ms(2분) - 범위: 0 ~ Integer.MAX_VALUE
props.put("delivery.timeout.ms", 120000); // 2분관계:
delivery.timeout.ms >= linger.ms + request.timeout.ms
request.timeout.ms
- 설명: 개별 요청의 최대 대기 시간
- 타입: Integer
- 기본값:
30000ms(30초)
props.put("request.timeout.ms", 30000);처리량 관련 설정
batch.size
- 설명: 배치당 최대 바이트 크기
- 타입: Integer
- 기본값:
16384(16KB) - 범위: 0 ~ Integer.MAX_VALUE
props.put("batch.size", 16384); // 16KB튜닝:
- 증가: 처리량 향상, 메모리 사용 증가
- 감소: 레이턴시 감소, 처리량 감소
// 고처리량 환경
props.put("batch.size", 32768); // 32KB
// 저레이턴시 환경
props.put("batch.size", 1024); // 1KBlinger.ms
- 설명: 배치를 전송하기 전 대기 시간
- 타입: Long
- 기본값:
0(즉시 전송) - 범위: 0 ~ Long.MAX_VALUE
props.put("linger.ms", 10); // 10ms 대기동작:
batch.size에 도달하거나linger.ms시간이 지나면 전송- 처리량을 위해 약간의 레이턴시 허용
// 처리량 우선
props.put("linger.ms", 100); // 100ms 대기
props.put("batch.size", 65536); // 64KB
// 레이턴시 우선
props.put("linger.ms", 0); // 즉시 전송buffer.memory
- 설명: Producer가 사용할 수 있는 전체 메모리
- 타입: Long
- 기본값:
33554432(32MB) - 범위: 0 ~ Long.MAX_VALUE
props.put("buffer.memory", 33554432); // 32MB동작:
- 버퍼가 가득 차면
max.block.ms동안 블로킹 - 초과 시
BufferExhaustedException발생
// 고처리량 환경
props.put("buffer.memory", 67108864); // 64MBmax.block.ms
- 설명: 버퍼가 가득 찼을 때 블로킹 최대 시간
- 타입: Long
- 기본값:
60000ms(60초)
props.put("max.block.ms", 60000);compression.type
- 설명: 메시지 압축 알고리즘
- 타입: String
- 기본값:
"none" - 가능한 값:
"none","gzip","snappy","lz4","zstd"
props.put("compression.type", "snappy");압축 알고리즘 비교:
| 알고리즘 | 압축률 | 속도 | CPU 사용 | 권장 사용 |
|---|---|---|---|---|
| none | - | 가장 빠름 | 낮음 | 작은 메시지, CPU 제한 |
| gzip | 가장 높음 | 느림 | 높음 | 저장 공간 중요 |
| snappy | 중간 | 빠름 | 낮음 | 균형잡힌 선택 |
| lz4 | 낮음 | 매우 빠름 | 낮음 | 높은 처리량 |
| zstd | 높음 | 중간 | 중간 | Kafka 2.1+, 균형 |
// 균형잡힌 설정 (권장)
props.put("compression.type", "snappy");
// 압축률 우선
props.put("compression.type", "zstd");
// 속도 우선
props.put("compression.type", "lz4");메시지 크기 관련 설정
max.request.size
- 설명: 요청의 최대 크기
- 타입: Integer
- 기본값:
1048576(1MB)
props.put("max.request.size", 1048576); // 1MB주의:
- 브로커의
message.max.bytes설정과 일치해야 함 - 토픽의
max.message.bytes설정도 확인 필요
// 큰 메시지를 전송하는 경우
props.put("max.request.size", 10485760); // 10MB
// 브로커 설정도 변경 필요
// message.max.bytes=10485760메타데이터 관련 설정
metadata.max.age.ms
- 설명: 메타데이터 강제 갱신 주기
- 타입: Long
- 기본값:
300000ms(5분)
props.put("metadata.max.age.ms", 300000);보안 관련 설정
security.protocol
- 타입: String
- 기본값:
"PLAINTEXT" - 가능한 값:
"PLAINTEXT","SSL","SASL_PLAINTEXT","SASL_SSL"
// SSL 사용
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "password");설정 조합 예제
1. 고신뢰성 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 신뢰성 최우선
props.put("acks", "all");
props.put("enable.idempotence", true);
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);
props.put("delivery.timeout.ms", 120000);
// 압축으로 네트워크 최적화
props.put("compression.type", "snappy");2. 고처리량 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 처리량 최우선
props.put("acks", "1");
props.put("batch.size", 65536); // 64KB
props.put("linger.ms", 100);
props.put("buffer.memory", 67108864); // 64MB
props.put("compression.type", "lz4");
props.put("max.in.flight.requests.per.connection", 5);3. 저레이턴시 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 레이턴시 최우선
props.put("acks", "1");
props.put("batch.size", 1024); // 1KB
props.put("linger.ms", 0); // 즉시 전송
props.put("compression.type", "none");4. 순서 보장 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 순서 보장 (멱등성 사용)
props.put("enable.idempotence", true);
props.put("acks", "all");
props.put("max.in.flight.requests.per.connection", 5);
// 또는 전통적 방식
// props.put("max.in.flight.requests.per.connection", 1);설정 선택 가이드
신뢰성 vs 성능 트레이드오프
신뢰성 ↑
↑
acks=all, idempotence=true
|
acks=1
|
acks=0
↓
성능 ↑
사용 사례별 권장 설정
금융 거래, 주문 처리
props.put("acks", "all");
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5);로그, 메트릭 수집
props.put("acks", "1");
props.put("batch.size", 32768);
props.put("linger.ms", 50);
props.put("compression.type", "snappy");실시간 이벤트 스트리밍
props.put("acks", "1");
props.put("linger.ms", 10);
props.put("compression.type", "lz4");CDC (Change Data Capture)
props.put("acks", "all");
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 1); // 순서 보장모니터링할 메트릭
Producer 설정 튜닝 시 다음 메트릭을 모니터링:
record-send-rate: 초당 전송 레코드 수batch-size-avg: 평균 배치 크기compression-rate-avg: 압축률record-queue-time-avg: 큐 대기 시간request-latency-avg: 요청 레이턴시buffer-available-bytes: 사용 가능한 버퍼 메모리
일반적인 실수
1. batch.size와 linger.ms 잘못된 조합
// ✗ 작은 batch.size + 긴 linger.ms
props.put("batch.size", 1024);
props.put("linger.ms", 1000);
// 배치가 빨리 차지 않아 항상 1초 대기
// ✓ 적절한 조합
props.put("batch.size", 16384);
props.put("linger.ms", 100);2. 순서 보장 설정 누락
// ✗ 순서가 중요한데 설정 안 함
props.put("max.in.flight.requests.per.connection", 5);
props.put("enable.idempotence", false);
// ✓ 멱등성으로 순서 보장
props.put("enable.idempotence", true);3. 브로커 설정과 불일치
// ✗ Producer의 max.request.size가 브로커보다 큼
props.put("max.request.size", 10485760); // 10MB
// 브로커: message.max.bytes=1048576 (1MB)
// 메시지 전송 실패!
댓글 (0)