Consumer Group과 Rebalancing

Consumer Group은 Kafka에서 메시지를 병렬로 소비하고 장애 복구를 제공하는 핵심 메커니즘입니다.

Consumer Group 개념

정의

Consumer Group은 하나 이상의 Consumer가 협력하여 토픽의 메시지를 소비하는 논리적 그룹입니다.

Topic: orders (3 partitions)

Consumer Group: order-processing
┌─────────────────────────────────────────┐
│  Consumer 1 ←── Partition 0             │
│  Consumer 2 ←── Partition 1             │
│  Consumer 3 ←── Partition 2             │
└─────────────────────────────────────────┘

핵심 원칙

  1. 파티션 독점: 한 파티션은 그룹 내 하나의 Consumer만 읽을 수 있음
  2. 자동 분배: 파티션은 그룹 내 Consumer들에게 자동으로 분배됨
  3. 확장성: Consumer 추가/제거 시 자동으로 재분배
  4. 장애 복구: Consumer 장애 시 다른 Consumer가 파티션을 인수

Consumer Group의 장점

1. 병렬 처리

Topic: events (6 partitions)
Consumer Group: event-processor (3 consumers)

Consumer 1: Partition 0, 1
Consumer 2: Partition 2, 3
Consumer 3: Partition 4, 5

2. 확장성

  • Consumer 추가: 부하 분산
  • Consumer 제거: 나머지가 파티션 인수
  • 파티션 수까지 Consumer 확장 가능

3. 고가용성

  • Consumer 장애 시 자동 failover
  • 오프셋 공유로 이어서 처리 가능

4. 독립적인 소비

Topic: logs

Consumer Group A: log-analytics    → 분석용
Consumer Group B: log-archiver     → 보관용
Consumer Group C: log-alerter      → 알림용

각 그룹은 독립적으로 토픽의 모든 메시지를 소비합니다.

Group Coordinator

역할

Group Coordinator는 Consumer Group을 관리하는 브로커입니다:

  1. 멤버십 관리: Consumer 가입/탈퇴 추적
  2. 하트비트 모니터링: Consumer 생존 확인
  3. 리밸런싱 조정: 파티션 재할당 조율
  4. 오프셋 저장: 그룹의 커밋된 오프셋 관리

Coordinator 선정

hash(group.id) % __consumer_offsets 파티션 수
→ 해당 파티션의 리더 브로커가 Coordinator

Group Leader

역할

Group Leader는 Consumer Group 내에서 선출된 Consumer입니다:

  1. 파티션 할당 계산: 어떤 Consumer가 어떤 파티션을 담당할지 결정
  2. 할당 결과 전송: Coordinator에게 할당 결과 전달

선출 방식

  • 가장 먼저 JoinGroup 요청을 보낸 Consumer
  • Coordinator가 지정

Rebalancing (리밸런싱)

정의

파티션을 Consumer들에게 재할당하는 프로세스입니다.

리밸런싱 트리거 조건

트리거설명
Consumer 가입새 Consumer가 그룹에 참여
Consumer 탈퇴Consumer가 그룹을 떠남
Consumer 장애하트비트 타임아웃
토픽 변경구독 토픽의 파티션 수 변경
구독 변경Consumer의 구독 토픽 변경

리밸런싱 프로세스

1. 트리거 발생
     ↓
2. Coordinator가 리밸런싱 시작
     ↓
3. 모든 Consumer에게 JoinGroup 요청
     ↓
4. Consumer들이 JoinGroup 응답 (구독 정보 포함)
     ↓
5. Group Leader가 파티션 할당 계산
     ↓
6. Leader가 SyncGroup으로 할당 정보 전송
     ↓
7. Coordinator가 각 Consumer에게 할당 결과 전달
     ↓
8. Consumer들이 새 파티션에서 소비 시작

리밸런싱의 영향

Stop-the-World 문제

  • 리밸런싱 중 모든 Consumer가 메시지 처리 중단
  • 처리량 일시적 감소
  • 대규모 그룹에서 더 큰 영향
Timeline:
[Processing] ─── [Rebalancing (Pause)] ─── [Processing]
                      ↑
                 모든 Consumer 일시 중단

파티션 할당 전략

1. Range Assignor (범위 할당)

토픽별로 파티션을 연속 범위로 할당합니다.

Topic: orders (6 partitions)
Consumer Group: 3 consumers

Consumer 1: Partition 0, 1
Consumer 2: Partition 2, 3
Consumer 3: Partition 4, 5

특징:

  • 기본 할당 전략
  • 토픽별로 독립적으로 할당
  • 여러 토픽 구독 시 불균형 가능

불균형 예시:

Topic A (3 partitions), Topic B (3 partitions)
2 Consumers

Consumer 1: A0, A1, B0, B1  (4개)
Consumer 2: A2, B2          (2개)

2. RoundRobin Assignor (라운드 로빈)

모든 파티션을 순환하며 균등 할당합니다.

Topic A (3 partitions), Topic B (3 partitions)
2 Consumers

Consumer 1: A0, A2, B1
Consumer 2: A1, B0, B2

특징:

  • 균등한 분배
  • 모든 Consumer가 같은 토픽 구독해야 효과적
  • 순서 보장이 어려울 수 있음

3. Sticky Assignor (고정 할당)

기존 할당을 최대한 유지하면서 재할당합니다.

Before Rebalancing:
Consumer 1: P0, P1
Consumer 2: P2, P3
Consumer 3: P4, P5

Consumer 2 Down → After Rebalancing:
Consumer 1: P0, P1, P2    (P0, P1 유지)
Consumer 3: P3, P4, P5    (P4, P5 유지)

특징:

  • 리밸런싱 시 이동 최소화
  • 캐시 활용 가능
  • Kafka 0.11+에서 지원

4. Cooperative Sticky Assignor (협력적 고정 할당)

점진적으로 파티션을 재할당합니다.

Before:
Consumer 1: P0, P1
Consumer 2: P2, P3

Consumer 3 Join → Incremental Rebalancing:
Step 1: Consumer 2에서 P3 revoke
Step 2: Consumer 3에 P3 할당
(Consumer 1, 2는 나머지 파티션 계속 처리)

특징:

  • Stop-the-World 문제 완화
  • 점진적 재할당
  • Kafka 2.4+에서 지원

할당 전략 설정

props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

할당 전략 비교

전략균등성안정성Stop-the-World권장 사용
Range낮음낮음발생단일 토픽
RoundRobin높음낮음발생균등 분배 필요
Sticky높음높음발생일반적 사용
CooperativeSticky높음높음없음권장 (Kafka 2.4+)

하트비트와 세션

하트비트 메커니즘

Consumer는 주기적으로 하트비트를 전송하여 생존을 알립니다:

Consumer ──heartbeat──> Coordinator
         <──response───

관련 설정

// 하트비트 전송 주기
props.put("heartbeat.interval.ms", "3000");  // 3초마다
 
// 세션 타임아웃
props.put("session.timeout.ms", "45000");  // 45초
 
// poll 간격 타임아웃
props.put("max.poll.interval.ms", "300000");  // 5분

타임아웃 관계

session.timeout.ms > heartbeat.interval.ms * 3 (권장)

heartbeat.interval.ms: 3초
session.timeout.ms: 10초 이상 권장

세션 타임아웃 vs Poll 간격 타임아웃

설정감지 대상영향
session.timeout.ms네트워크 장애, JVM 크래시빠른 장애 감지
max.poll.interval.ms느린 메시지 처리처리 시간 보장

Static Group Membership

개념

Consumer에게 고정 ID를 부여하여 리밸런싱을 줄이는 기능입니다.

설정

props.put("group.instance.id", "consumer-host-1");

동작 방식

일반 Consumer:

Consumer 재시작 → 리밸런싱 발생

Static Member:

Consumer 재시작 (session.timeout.ms 내)
→ 기존 할당 유지
→ 리밸런싱 없음

장점

  • 롤링 업데이트 시 리밸런싱 최소화
  • 일시적 장애에 대한 내성
  • 안정적인 파티션 할당

주의사항

  • session.timeout.ms 내에 재시작해야 함
  • 각 Consumer마다 고유한 ID 필요
  • 동일 ID로 두 Consumer 실행 불가

리밸런싱 리스너

ConsumerRebalanceListener

파티션 할당/해제 시 콜백을 받을 수 있습니다:

consumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
 
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 파티션이 해제되기 전 호출
        // 오프셋 커밋, 리소스 정리 등
        System.out.println("Revoked: " + partitions);
        consumer.commitSync();
    }
 
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 파티션이 할당된 후 호출
        // 초기화 작업 등
        System.out.println("Assigned: " + partitions);
    }
});

활용 사례

1. 오프셋 커밋

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // 리밸런싱 전 현재까지 처리한 오프셋 커밋
    consumer.commitSync(currentOffsets);
}

2. 외부 시스템 정리

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // 데이터베이스 트랜잭션 완료
    database.commitTransaction();
    // 파일 핸들러 정리
    fileHandlers.close();
}

3. 상태 복구

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    // 로컬 캐시 초기화
    for (TopicPartition partition : partitions) {
        cache.loadState(partition);
    }
}

Best Practices

1. 리밸런싱 최소화

// Cooperative 할당 전략 사용
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
 
// Static Membership 사용
props.put("group.instance.id", "unique-consumer-id");

2. 적절한 타임아웃 설정

// 빠른 장애 감지
props.put("session.timeout.ms", "10000");
props.put("heartbeat.interval.ms", "3000");
 
// 충분한 처리 시간 보장
props.put("max.poll.interval.ms", "300000");

3. Consumer 수 최적화

Consumer 수 <= 파티션 수

예: 6 파티션 → 최대 6 Consumer
    (7번째 Consumer는 유휴 상태)

4. 리밸런싱 리스너 구현

// 항상 오프셋을 안전하게 커밋
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    consumer.commitSync();
}

5. 처리 시간 관리

// max.poll.interval.ms 초과 방지
props.put("max.poll.records", "100");  // 적절한 배치 크기
 
// 또는 처리 로직 최적화
while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    processWithTimeout(records, MAX_PROCESSING_TIME);
}

관련 문서