Consumer 주요 설정

Kafka Consumer의 동작을 제어하는 핵심 설정들을 살펴봅니다.

필수 설정

bootstrap.servers

브로커 연결 주소입니다.

props.put("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");
  • 최소 2-3개 브로커 지정 권장
  • 전체 클러스터 메타데이터는 자동으로 가져옴

group.id

Consumer Group 식별자입니다.

props.put("group.id", "my-consumer-group");
  • 같은 group.id를 가진 Consumer들이 하나의 그룹 형성
  • assign() 사용 시에는 선택적

key.deserializer / value.deserializer

메시지 역직렬화 클래스입니다.

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

기본 제공 Deserializer:

  • StringDeserializer
  • ByteArrayDeserializer
  • IntegerDeserializer
  • LongDeserializer
  • DoubleDeserializer

Fetch 관련 설정

fetch.min.bytes

한 번의 fetch 요청에서 가져올 최소 데이터 크기입니다.

props.put("fetch.min.bytes", "1");  // 기본값: 1
효과
작은 값낮은 레이턴시, 높은 요청 빈도
큰 값높은 처리량, 높은 레이턴시

fetch.max.wait.ms

fetch.min.bytes를 만족하지 않을 때 대기하는 최대 시간입니다.

props.put("fetch.max.wait.ms", "500");  // 기본값: 500ms
  • fetch.min.bytes와 함께 동작
  • 둘 중 하나의 조건이 충족되면 응답 반환
조건: fetch.min.bytes = 1MB, fetch.max.wait.ms = 500ms

시나리오 1: 100ms 내에 1MB 도달 → 100ms 후 응답
시나리오 2: 500ms까지 500KB만 수집 → 500ms 후 응답 (500KB)

fetch.max.bytes

한 번의 fetch 요청에서 가져올 최대 데이터 크기입니다.

props.put("fetch.max.bytes", "52428800");  // 기본값: 50MB
  • Consumer 메모리 사용량에 영향
  • 브로커 설정 message.max.bytes보다 커야 함

max.partition.fetch.bytes

파티션당 가져올 최대 데이터 크기입니다.

props.put("max.partition.fetch.bytes", "1048576");  // 기본값: 1MB
  • 파티션 수 × max.partition.fetch.bytes ≈ 메모리 사용량
  • 큰 메시지가 있으면 더 크게 설정 필요

Poll 관련 설정

max.poll.records

한 번의 poll()에서 반환할 최대 레코드 수입니다.

props.put("max.poll.records", "500");  // 기본값: 500
시나리오권장값
빠른 처리높은 값 (1000+)
느린 처리낮은 값 (100-200)
메모리 제한낮은 값

max.poll.interval.ms

poll() 호출 간 최대 허용 시간입니다.

props.put("max.poll.interval.ms", "300000");  // 기본값: 5분

초과 시 영향:

  • Consumer가 그룹에서 제외됨
  • 리밸런싱 발생
Timeline:
poll() ────── processing ────── poll()
       |←─── max.poll.interval.ms ───→|

처리 시간이 이 값을 초과하면:
→ Consumer "dead"로 간주
→ 리밸런싱 트리거

권장 설정:

// 처리 시간이 긴 경우
props.put("max.poll.interval.ms", "600000");  // 10분
props.put("max.poll.records", "100");  // 배치 크기 줄이기

Session 관련 설정

session.timeout.ms

Consumer 세션 타임아웃입니다.

props.put("session.timeout.ms", "45000");  // 기본값: 45초 (Kafka 3.0+)
  • 하트비트가 이 시간 동안 없으면 Consumer 죽은 것으로 간주
  • 너무 짧으면: GC pause 등으로 잘못된 리밸런싱
  • 너무 길면: 장애 감지 지연

heartbeat.interval.ms

하트비트 전송 주기입니다.

props.put("heartbeat.interval.ms", "3000");  // 기본값: 3초

권장 관계:

session.timeout.ms >= 3 × heartbeat.interval.ms

설정 예시:

props.put("session.timeout.ms", "10000");     // 10초
props.put("heartbeat.interval.ms", "3000");   // 3초

Offset 관련 설정

enable.auto.commit

자동 오프셋 커밋 여부입니다.

props.put("enable.auto.commit", "true");  // 기본값: true
효과
true자동 커밋, 구현 간단, 중복 가능
false수동 커밋, 정밀 제어, 안전

auto.commit.interval.ms

자동 커밋 주기입니다.

props.put("auto.commit.interval.ms", "5000");  // 기본값: 5초
  • enable.auto.commit=true일 때만 적용
  • 짧으면: 오버헤드 증가
  • 길면: 중복 처리 증가

auto.offset.reset

오프셋이 없을 때의 시작 위치입니다.

props.put("auto.offset.reset", "latest");  // 기본값: latest
동작
latest최신 메시지부터
earliest처음부터
none예외 발생

파티션 할당 설정

partition.assignment.strategy

파티션 할당 전략입니다.

props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

주요 전략:

전략클래스특징
RangeRangeAssignor기본값, 토픽별 범위 할당
RoundRobinRoundRobinAssignor균등 분배
StickyStickyAssignor기존 할당 유지
CooperativeStickyCooperativeStickyAssignor점진적 리밸런싱

권장: CooperativeStickyAssignor (Kafka 2.4+)

props.put("partition.assignment.strategy",
    Arrays.asList(
        "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
    ));

group.instance.id

Static Membership을 위한 고정 ID입니다.

props.put("group.instance.id", "consumer-instance-1");
  • 설정 시 Static Member로 동작
  • 일시적 연결 끊김에도 파티션 유지
  • 롤링 재시작 시 리밸런싱 방지

네트워크 설정

request.timeout.ms

요청 타임아웃입니다.

props.put("request.timeout.ms", "30000");  // 기본값: 30초

default.api.timeout.ms

Admin API 호출 타임아웃입니다.

props.put("default.api.timeout.ms", "60000");  // 기본값: 60초

connections.max.idle.ms

유휴 연결 유지 시간입니다.

props.put("connections.max.idle.ms", "540000");  // 기본값: 9분

reconnect.backoff.ms

재연결 대기 시간입니다.

props.put("reconnect.backoff.ms", "50");  // 기본값: 50ms
props.put("reconnect.backoff.max.ms", "1000");  // 최대 대기 시간

보안 설정

SSL 설정

props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "password");
props.put("ssl.keystore.location", "/path/to/keystore.jks");
props.put("ssl.keystore.password", "password");
props.put("ssl.key.password", "password");

SASL 설정

props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"user\" password=\"password\";");

설정 조합 예시

저지연 처리

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "low-latency-group");
 
// Fetch 설정 - 빠른 응답
props.put("fetch.min.bytes", "1");
props.put("fetch.max.wait.ms", "100");
 
// Poll 설정 - 작은 배치
props.put("max.poll.records", "100");
 
// Session 설정 - 빠른 감지
props.put("session.timeout.ms", "10000");
props.put("heartbeat.interval.ms", "3000");

고처리량 처리

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "high-throughput-group");
 
// Fetch 설정 - 큰 배치
props.put("fetch.min.bytes", "1048576");  // 1MB
props.put("fetch.max.wait.ms", "500");
props.put("fetch.max.bytes", "104857600");  // 100MB
 
// Poll 설정 - 큰 배치
props.put("max.poll.records", "10000");
props.put("max.poll.interval.ms", "600000");  // 10분

안정적 처리

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "reliable-group");
 
// Offset 설정 - 수동 커밋
props.put("enable.auto.commit", "false");
 
// 할당 설정 - 안정적 할당
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
props.put("group.instance.id", "stable-consumer-1");
 
// Session 설정 - 여유있는 타임아웃
props.put("session.timeout.ms", "45000");
props.put("heartbeat.interval.ms", "10000");

느린 처리 (배치 작업)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "batch-processing-group");
 
// 긴 처리 시간 허용
props.put("max.poll.interval.ms", "1800000");  // 30분
props.put("max.poll.records", "50");  // 작은 배치
 
// 세션 유지
props.put("session.timeout.ms", "60000");
props.put("heartbeat.interval.ms", "10000");

설정 요약표

설정기본값영향
fetch.min.bytes1레이턴시 vs 처리량
fetch.max.wait.ms500ms레이턴시
max.poll.records500배치 크기
max.poll.interval.ms5분처리 시간 허용
session.timeout.ms45초장애 감지 속도
heartbeat.interval.ms3초그룹 조정 빈도
enable.auto.committrue오프셋 관리 방식
auto.offset.resetlatest초기 위치

관련 문서