Consumer 주요 설정
Kafka Consumer의 동작을 제어하는 핵심 설정들을 살펴봅니다.
필수 설정
bootstrap.servers
브로커 연결 주소입니다.
props.put("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");- 최소 2-3개 브로커 지정 권장
- 전체 클러스터 메타데이터는 자동으로 가져옴
group.id
Consumer Group 식별자입니다.
props.put("group.id", "my-consumer-group");- 같은 group.id를 가진 Consumer들이 하나의 그룹 형성
assign()사용 시에는 선택적
key.deserializer / value.deserializer
메시지 역직렬화 클래스입니다.
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");기본 제공 Deserializer:
StringDeserializerByteArrayDeserializerIntegerDeserializerLongDeserializerDoubleDeserializer
Fetch 관련 설정
fetch.min.bytes
한 번의 fetch 요청에서 가져올 최소 데이터 크기입니다.
props.put("fetch.min.bytes", "1"); // 기본값: 1| 값 | 효과 |
|---|---|
| 작은 값 | 낮은 레이턴시, 높은 요청 빈도 |
| 큰 값 | 높은 처리량, 높은 레이턴시 |
fetch.max.wait.ms
fetch.min.bytes를 만족하지 않을 때 대기하는 최대 시간입니다.
props.put("fetch.max.wait.ms", "500"); // 기본값: 500msfetch.min.bytes와 함께 동작- 둘 중 하나의 조건이 충족되면 응답 반환
조건: fetch.min.bytes = 1MB, fetch.max.wait.ms = 500ms
시나리오 1: 100ms 내에 1MB 도달 → 100ms 후 응답
시나리오 2: 500ms까지 500KB만 수집 → 500ms 후 응답 (500KB)
fetch.max.bytes
한 번의 fetch 요청에서 가져올 최대 데이터 크기입니다.
props.put("fetch.max.bytes", "52428800"); // 기본값: 50MB- Consumer 메모리 사용량에 영향
- 브로커 설정
message.max.bytes보다 커야 함
max.partition.fetch.bytes
파티션당 가져올 최대 데이터 크기입니다.
props.put("max.partition.fetch.bytes", "1048576"); // 기본값: 1MB- 파티션 수 × max.partition.fetch.bytes ≈ 메모리 사용량
- 큰 메시지가 있으면 더 크게 설정 필요
Poll 관련 설정
max.poll.records
한 번의 poll()에서 반환할 최대 레코드 수입니다.
props.put("max.poll.records", "500"); // 기본값: 500| 시나리오 | 권장값 |
|---|---|
| 빠른 처리 | 높은 값 (1000+) |
| 느린 처리 | 낮은 값 (100-200) |
| 메모리 제한 | 낮은 값 |
max.poll.interval.ms
poll() 호출 간 최대 허용 시간입니다.
props.put("max.poll.interval.ms", "300000"); // 기본값: 5분초과 시 영향:
- Consumer가 그룹에서 제외됨
- 리밸런싱 발생
Timeline:
poll() ────── processing ────── poll()
|←─── max.poll.interval.ms ───→|
처리 시간이 이 값을 초과하면:
→ Consumer "dead"로 간주
→ 리밸런싱 트리거
권장 설정:
// 처리 시간이 긴 경우
props.put("max.poll.interval.ms", "600000"); // 10분
props.put("max.poll.records", "100"); // 배치 크기 줄이기Session 관련 설정
session.timeout.ms
Consumer 세션 타임아웃입니다.
props.put("session.timeout.ms", "45000"); // 기본값: 45초 (Kafka 3.0+)- 하트비트가 이 시간 동안 없으면 Consumer 죽은 것으로 간주
- 너무 짧으면: GC pause 등으로 잘못된 리밸런싱
- 너무 길면: 장애 감지 지연
heartbeat.interval.ms
하트비트 전송 주기입니다.
props.put("heartbeat.interval.ms", "3000"); // 기본값: 3초권장 관계:
session.timeout.ms >= 3 × heartbeat.interval.ms
설정 예시:
props.put("session.timeout.ms", "10000"); // 10초
props.put("heartbeat.interval.ms", "3000"); // 3초Offset 관련 설정
enable.auto.commit
자동 오프셋 커밋 여부입니다.
props.put("enable.auto.commit", "true"); // 기본값: true| 값 | 효과 |
|---|---|
| true | 자동 커밋, 구현 간단, 중복 가능 |
| false | 수동 커밋, 정밀 제어, 안전 |
auto.commit.interval.ms
자동 커밋 주기입니다.
props.put("auto.commit.interval.ms", "5000"); // 기본값: 5초enable.auto.commit=true일 때만 적용- 짧으면: 오버헤드 증가
- 길면: 중복 처리 증가
auto.offset.reset
오프셋이 없을 때의 시작 위치입니다.
props.put("auto.offset.reset", "latest"); // 기본값: latest| 값 | 동작 |
|---|---|
| latest | 최신 메시지부터 |
| earliest | 처음부터 |
| none | 예외 발생 |
파티션 할당 설정
partition.assignment.strategy
파티션 할당 전략입니다.
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");주요 전략:
| 전략 | 클래스 | 특징 |
|---|---|---|
| Range | RangeAssignor | 기본값, 토픽별 범위 할당 |
| RoundRobin | RoundRobinAssignor | 균등 분배 |
| Sticky | StickyAssignor | 기존 할당 유지 |
| CooperativeSticky | CooperativeStickyAssignor | 점진적 리밸런싱 |
권장: CooperativeStickyAssignor (Kafka 2.4+)
props.put("partition.assignment.strategy",
Arrays.asList(
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
));group.instance.id
Static Membership을 위한 고정 ID입니다.
props.put("group.instance.id", "consumer-instance-1");- 설정 시 Static Member로 동작
- 일시적 연결 끊김에도 파티션 유지
- 롤링 재시작 시 리밸런싱 방지
네트워크 설정
request.timeout.ms
요청 타임아웃입니다.
props.put("request.timeout.ms", "30000"); // 기본값: 30초default.api.timeout.ms
Admin API 호출 타임아웃입니다.
props.put("default.api.timeout.ms", "60000"); // 기본값: 60초connections.max.idle.ms
유휴 연결 유지 시간입니다.
props.put("connections.max.idle.ms", "540000"); // 기본값: 9분reconnect.backoff.ms
재연결 대기 시간입니다.
props.put("reconnect.backoff.ms", "50"); // 기본값: 50ms
props.put("reconnect.backoff.max.ms", "1000"); // 최대 대기 시간보안 설정
SSL 설정
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "password");
props.put("ssl.keystore.location", "/path/to/keystore.jks");
props.put("ssl.keystore.password", "password");
props.put("ssl.key.password", "password");SASL 설정
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"user\" password=\"password\";");설정 조합 예시
저지연 처리
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "low-latency-group");
// Fetch 설정 - 빠른 응답
props.put("fetch.min.bytes", "1");
props.put("fetch.max.wait.ms", "100");
// Poll 설정 - 작은 배치
props.put("max.poll.records", "100");
// Session 설정 - 빠른 감지
props.put("session.timeout.ms", "10000");
props.put("heartbeat.interval.ms", "3000");고처리량 처리
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "high-throughput-group");
// Fetch 설정 - 큰 배치
props.put("fetch.min.bytes", "1048576"); // 1MB
props.put("fetch.max.wait.ms", "500");
props.put("fetch.max.bytes", "104857600"); // 100MB
// Poll 설정 - 큰 배치
props.put("max.poll.records", "10000");
props.put("max.poll.interval.ms", "600000"); // 10분안정적 처리
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "reliable-group");
// Offset 설정 - 수동 커밋
props.put("enable.auto.commit", "false");
// 할당 설정 - 안정적 할당
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
props.put("group.instance.id", "stable-consumer-1");
// Session 설정 - 여유있는 타임아웃
props.put("session.timeout.ms", "45000");
props.put("heartbeat.interval.ms", "10000");느린 처리 (배치 작업)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "batch-processing-group");
// 긴 처리 시간 허용
props.put("max.poll.interval.ms", "1800000"); // 30분
props.put("max.poll.records", "50"); // 작은 배치
// 세션 유지
props.put("session.timeout.ms", "60000");
props.put("heartbeat.interval.ms", "10000");설정 요약표
| 설정 | 기본값 | 영향 |
|---|---|---|
fetch.min.bytes | 1 | 레이턴시 vs 처리량 |
fetch.max.wait.ms | 500ms | 레이턴시 |
max.poll.records | 500 | 배치 크기 |
max.poll.interval.ms | 5분 | 처리 시간 허용 |
session.timeout.ms | 45초 | 장애 감지 속도 |
heartbeat.interval.ms | 3초 | 그룹 조정 빈도 |
enable.auto.commit | true | 오프셋 관리 방식 |
auto.offset.reset | latest | 초기 위치 |
댓글 (0)