Consumer Group과 Rebalancing
Consumer Group은 Kafka에서 메시지를 병렬로 소비하고 장애 복구를 제공하는 핵심 메커니즘입니다.
Consumer Group 개념
정의
Consumer Group은 하나 이상의 Consumer가 협력하여 토픽의 메시지를 소비하는 논리적 그룹입니다.
Topic: orders (3 partitions)
Consumer Group: order-processing
┌─────────────────────────────────────────┐
│ Consumer 1 ←── Partition 0 │
│ Consumer 2 ←── Partition 1 │
│ Consumer 3 ←── Partition 2 │
└─────────────────────────────────────────┘
핵심 원칙
- 파티션 독점: 한 파티션은 그룹 내 하나의 Consumer만 읽을 수 있음
- 자동 분배: 파티션은 그룹 내 Consumer들에게 자동으로 분배됨
- 확장성: Consumer 추가/제거 시 자동으로 재분배
- 장애 복구: Consumer 장애 시 다른 Consumer가 파티션을 인수
Consumer Group의 장점
1. 병렬 처리
Topic: events (6 partitions)
Consumer Group: event-processor (3 consumers)
Consumer 1: Partition 0, 1
Consumer 2: Partition 2, 3
Consumer 3: Partition 4, 5
2. 확장성
- Consumer 추가: 부하 분산
- Consumer 제거: 나머지가 파티션 인수
- 파티션 수까지 Consumer 확장 가능
3. 고가용성
- Consumer 장애 시 자동 failover
- 오프셋 공유로 이어서 처리 가능
4. 독립적인 소비
Topic: logs
Consumer Group A: log-analytics → 분석용
Consumer Group B: log-archiver → 보관용
Consumer Group C: log-alerter → 알림용
각 그룹은 독립적으로 토픽의 모든 메시지를 소비합니다.
Group Coordinator
역할
Group Coordinator는 Consumer Group을 관리하는 브로커입니다:
- 멤버십 관리: Consumer 가입/탈퇴 추적
- 하트비트 모니터링: Consumer 생존 확인
- 리밸런싱 조정: 파티션 재할당 조율
- 오프셋 저장: 그룹의 커밋된 오프셋 관리
Coordinator 선정
hash(group.id) % __consumer_offsets 파티션 수
→ 해당 파티션의 리더 브로커가 Coordinator
Group Leader
역할
Group Leader는 Consumer Group 내에서 선출된 Consumer입니다:
- 파티션 할당 계산: 어떤 Consumer가 어떤 파티션을 담당할지 결정
- 할당 결과 전송: Coordinator에게 할당 결과 전달
선출 방식
- 가장 먼저 JoinGroup 요청을 보낸 Consumer
- Coordinator가 지정
Rebalancing (리밸런싱)
정의
파티션을 Consumer들에게 재할당하는 프로세스입니다.
리밸런싱 트리거 조건
| 트리거 | 설명 |
|---|---|
| Consumer 가입 | 새 Consumer가 그룹에 참여 |
| Consumer 탈퇴 | Consumer가 그룹을 떠남 |
| Consumer 장애 | 하트비트 타임아웃 |
| 토픽 변경 | 구독 토픽의 파티션 수 변경 |
| 구독 변경 | Consumer의 구독 토픽 변경 |
리밸런싱 프로세스
1. 트리거 발생
↓
2. Coordinator가 리밸런싱 시작
↓
3. 모든 Consumer에게 JoinGroup 요청
↓
4. Consumer들이 JoinGroup 응답 (구독 정보 포함)
↓
5. Group Leader가 파티션 할당 계산
↓
6. Leader가 SyncGroup으로 할당 정보 전송
↓
7. Coordinator가 각 Consumer에게 할당 결과 전달
↓
8. Consumer들이 새 파티션에서 소비 시작
리밸런싱의 영향
Stop-the-World 문제
- 리밸런싱 중 모든 Consumer가 메시지 처리 중단
- 처리량 일시적 감소
- 대규모 그룹에서 더 큰 영향
Timeline:
[Processing] ─── [Rebalancing (Pause)] ─── [Processing]
↑
모든 Consumer 일시 중단
파티션 할당 전략
1. Range Assignor (범위 할당)
토픽별로 파티션을 연속 범위로 할당합니다.
Topic: orders (6 partitions)
Consumer Group: 3 consumers
Consumer 1: Partition 0, 1
Consumer 2: Partition 2, 3
Consumer 3: Partition 4, 5
특징:
- 기본 할당 전략
- 토픽별로 독립적으로 할당
- 여러 토픽 구독 시 불균형 가능
불균형 예시:
Topic A (3 partitions), Topic B (3 partitions)
2 Consumers
Consumer 1: A0, A1, B0, B1 (4개)
Consumer 2: A2, B2 (2개)
2. RoundRobin Assignor (라운드 로빈)
모든 파티션을 순환하며 균등 할당합니다.
Topic A (3 partitions), Topic B (3 partitions)
2 Consumers
Consumer 1: A0, A2, B1
Consumer 2: A1, B0, B2
특징:
- 균등한 분배
- 모든 Consumer가 같은 토픽 구독해야 효과적
- 순서 보장이 어려울 수 있음
3. Sticky Assignor (고정 할당)
기존 할당을 최대한 유지하면서 재할당합니다.
Before Rebalancing:
Consumer 1: P0, P1
Consumer 2: P2, P3
Consumer 3: P4, P5
Consumer 2 Down → After Rebalancing:
Consumer 1: P0, P1, P2 (P0, P1 유지)
Consumer 3: P3, P4, P5 (P4, P5 유지)
특징:
- 리밸런싱 시 이동 최소화
- 캐시 활용 가능
- Kafka 0.11+에서 지원
4. Cooperative Sticky Assignor (협력적 고정 할당)
점진적으로 파티션을 재할당합니다.
Before:
Consumer 1: P0, P1
Consumer 2: P2, P3
Consumer 3 Join → Incremental Rebalancing:
Step 1: Consumer 2에서 P3 revoke
Step 2: Consumer 3에 P3 할당
(Consumer 1, 2는 나머지 파티션 계속 처리)
특징:
- Stop-the-World 문제 완화
- 점진적 재할당
- Kafka 2.4+에서 지원
할당 전략 설정
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");할당 전략 비교
| 전략 | 균등성 | 안정성 | Stop-the-World | 권장 사용 |
|---|---|---|---|---|
| Range | 낮음 | 낮음 | 발생 | 단일 토픽 |
| RoundRobin | 높음 | 낮음 | 발생 | 균등 분배 필요 |
| Sticky | 높음 | 높음 | 발생 | 일반적 사용 |
| CooperativeSticky | 높음 | 높음 | 없음 | 권장 (Kafka 2.4+) |
하트비트와 세션
하트비트 메커니즘
Consumer는 주기적으로 하트비트를 전송하여 생존을 알립니다:
Consumer ──heartbeat──> Coordinator
<──response───
관련 설정
// 하트비트 전송 주기
props.put("heartbeat.interval.ms", "3000"); // 3초마다
// 세션 타임아웃
props.put("session.timeout.ms", "45000"); // 45초
// poll 간격 타임아웃
props.put("max.poll.interval.ms", "300000"); // 5분타임아웃 관계
session.timeout.ms > heartbeat.interval.ms * 3 (권장)
heartbeat.interval.ms: 3초
session.timeout.ms: 10초 이상 권장
세션 타임아웃 vs Poll 간격 타임아웃
| 설정 | 감지 대상 | 영향 |
|---|---|---|
session.timeout.ms | 네트워크 장애, JVM 크래시 | 빠른 장애 감지 |
max.poll.interval.ms | 느린 메시지 처리 | 처리 시간 보장 |
Static Group Membership
개념
Consumer에게 고정 ID를 부여하여 리밸런싱을 줄이는 기능입니다.
설정
props.put("group.instance.id", "consumer-host-1");동작 방식
일반 Consumer:
Consumer 재시작 → 리밸런싱 발생
Static Member:
Consumer 재시작 (session.timeout.ms 내)
→ 기존 할당 유지
→ 리밸런싱 없음
장점
- 롤링 업데이트 시 리밸런싱 최소화
- 일시적 장애에 대한 내성
- 안정적인 파티션 할당
주의사항
session.timeout.ms내에 재시작해야 함- 각 Consumer마다 고유한 ID 필요
- 동일 ID로 두 Consumer 실행 불가
리밸런싱 리스너
ConsumerRebalanceListener
파티션 할당/해제 시 콜백을 받을 수 있습니다:
consumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 파티션이 해제되기 전 호출
// 오프셋 커밋, 리소스 정리 등
System.out.println("Revoked: " + partitions);
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 파티션이 할당된 후 호출
// 초기화 작업 등
System.out.println("Assigned: " + partitions);
}
});활용 사례
1. 오프셋 커밋
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 리밸런싱 전 현재까지 처리한 오프셋 커밋
consumer.commitSync(currentOffsets);
}2. 외부 시스템 정리
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 데이터베이스 트랜잭션 완료
database.commitTransaction();
// 파일 핸들러 정리
fileHandlers.close();
}3. 상태 복구
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 로컬 캐시 초기화
for (TopicPartition partition : partitions) {
cache.loadState(partition);
}
}Best Practices
1. 리밸런싱 최소화
// Cooperative 할당 전략 사용
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Static Membership 사용
props.put("group.instance.id", "unique-consumer-id");2. 적절한 타임아웃 설정
// 빠른 장애 감지
props.put("session.timeout.ms", "10000");
props.put("heartbeat.interval.ms", "3000");
// 충분한 처리 시간 보장
props.put("max.poll.interval.ms", "300000");3. Consumer 수 최적화
Consumer 수 <= 파티션 수
예: 6 파티션 → 최대 6 Consumer
(7번째 Consumer는 유휴 상태)
4. 리밸런싱 리스너 구현
// 항상 오프셋을 안전하게 커밋
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync();
}5. 처리 시간 관리
// max.poll.interval.ms 초과 방지
props.put("max.poll.records", "100"); // 적절한 배치 크기
// 또는 처리 로직 최적화
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
processWithTimeout(records, MAX_PROCESSING_TIME);
}
댓글 (0)