Consumer Group 설계
효과적인 Consumer Group 설계는 Kafka 기반 시스템의 확장성과 안정성에 핵심적입니다.
Consumer Group 기본
개념
┌─────────────────────────────────────────────────────────────────┐
│ Consumer Group Concept │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Topic: orders (6 partitions) │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Consumer Group: order-processor │ │
│ │ │ │
│ │ Consumer 1 Consumer 2 Consumer 3 │ │
│ │ [P0, P1] [P2, P3] [P4, P5] │ │
│ │ │ │
│ │ 각 Consumer가 고유한 파티션을 담당 │ │
│ │ 같은 메시지는 그룹 내에서 한 번만 처리됨 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Consumer Group: analytics │ │
│ │ │ │
│ │ Consumer A │ │
│ │ [P0, P1, P2, P3, P4, P5] │ │
│ │ │ │
│ │ 별도 그룹으로 동일 메시지를 독립적으로 소비 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
핵심 규칙
1. 파티션 : Consumer = N : 1
- 하나의 파티션은 그룹 내 하나의 Consumer만 소비
- 하나의 Consumer는 여러 파티션 소비 가능
2. Consumer 수 ≤ 파티션 수
- 초과 Consumer는 유휴 상태
3. 다른 그룹은 독립적
- 동일 토픽을 여러 그룹이 각자 소비 가능
Consumer Group 설계 패턴
패턴 1: 단일 처리 그룹
사용 사례: 단순 이벤트 처리
┌─────────────────────────────────────────────────────────────────┐
│ │
│ Topic: events │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Consumer Group │ │
│ │ event-processor │ │
│ └─────────────────┘ │
│ │ │
│ ▼ │
│ Event Handler │
│ │
└─────────────────────────────────────────────────────────────────┘
// 설정
Properties props = new Properties();
props.put("group.id", "event-processor");
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, Event> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("events"));패턴 2: 다중 구독 그룹
사용 사례: 여러 토픽을 하나의 그룹이 처리
┌─────────────────────────────────────────────────────────────────┐
│ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ orders │ │ payments │ │ shipments │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │ │ │ │
│ └─────────────┼─────────────┘ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Consumer Group │ │
│ │ order-fulfillment │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
// 여러 토픽 구독
consumer.subscribe(Arrays.asList("orders", "payments", "shipments"));
// 또는 패턴 매칭
consumer.subscribe(Pattern.compile("order-.*"));패턴 3: 팬아웃 (Fan-out)
사용 사례: 동일 메시지를 여러 시스템에서 독립 처리
┌─────────────────────────────────────────────────────────────────┐
│ │
│ Topic: order-created │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ inventory │ │ shipping │ │ analytics │ │
│ │ -service │ │ -service │ │ -service │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ 각 서비스는 별도 Consumer Group으로 독립 소비 │
│ │
└─────────────────────────────────────────────────────────────────┘
// inventory-service
props.put("group.id", "inventory-service");
// shipping-service
props.put("group.id", "shipping-service");
// analytics-service
props.put("group.id", "analytics-service");패턴 4: 파이프라인
사용 사례: 단계별 데이터 처리
┌─────────────────────────────────────────────────────────────────┐
│ │
│ raw-events → enricher → enriched-events → processor → results │
│ │
│ ┌─────────┐ ┌───────────────┐ ┌─────────────────┐ │
│ │raw- │ │ enricher │ │ enriched- │ │
│ │events │──▶│ group.id= │──▶│ events │ │
│ │ │ │ enricher │ │ │ │
│ └─────────┘ └───────────────┘ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ processor │ │
│ │ group.id= │ │
│ │ processor │ │
│ └───────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ results │ │
│ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
패턴 5: 실패 처리 분리
사용 사례: 메인 처리와 재시도 분리
┌─────────────────────────────────────────────────────────────────┐
│ │
│ Topic: orders │
│ │ │
│ ▼ │
│ ┌───────────────┐ 실패 ┌───────────────┐ │
│ │ order- │ ─────────▶ │ orders-dlq │ │
│ │ processor │ └───────────────┘ │
│ │ group.id= │ │ │
│ │ order-main │ ▼ │
│ └───────────────┘ ┌───────────────┐ │
│ │ │ dlq-processor │ │
│ │ 성공 │ group.id= │ │
│ ▼ │ order-dlq │ │
│ DB 저장 └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Consumer Group 명명 규칙
권장 패턴
형식: {서비스명}-{기능}-{환경}
예시:
- order-service-processor-prod
- payment-gateway-validator-staging
- analytics-etl-daily-prod
또는 단순화:
- {서비스명}-{기능}
- order-processor
- payment-validator
피해야 할 패턴
❌ 너무 일반적
- consumer1
- my-group
- test
❌ 환경 정보 누락 (멀티 환경)
- order-processor (dev? prod?)
❌ 버전 포함 (변경 어려움)
- order-processor-v1
Consumer Group 크기 결정
계산 공식
최적 Consumer 수 = min(
파티션 수,
필요 처리량 / Consumer당 처리량,
가용 리소스로 운영 가능한 인스턴스 수
)
예시:
- 파티션 수: 12
- 필요 처리량: 10,000 msg/s
- Consumer당 처리량: 2,000 msg/s
- 가용 인스턴스: 8
Consumer 수 = min(12, 10000/2000, 8) = min(12, 5, 8) = 5
권장: 6개 (여유 고려)
확장 가이드라인
┌─────────────────────────────────────────────────────────────────┐
│ Scaling Guidelines │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Lag 증가 감지 │
│ │ │
│ ▼ │
│ Consumer 수 < 파티션 수? │
│ │ │
│ YES │ NO │
│ │ │ │
│ ▼ ▼ │
│ Consumer │ 파티션 수 │
│ 추가 │ 증가 검토 │
│ │ │
│ ▼ │
│ 처리 로직 │
│ 최적화 검토 │
│ │
└─────────────────────────────────────────────────────────────────┘
Rebalance 최적화
Static Membership
// 안정적인 Consumer ID로 불필요한 Rebalance 방지
props.put("group.instance.id", "consumer-" + hostname);
props.put("session.timeout.ms", "300000"); // 5분Cooperative Rebalancing
// 점진적 Rebalance로 다운타임 최소화
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");Rebalance 리스너
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 현재 처리 중인 작업 완료
// Offset 커밋
commitOffsets();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 새 파티션 처리 준비
// 필요시 Offset 조회
loadOffsets(partitions);
}
});에러 처리 전략
재시도 전략
public class RetryingConsumer {
private static final int MAX_RETRIES = 3;
private static final long RETRY_BACKOFF_MS = 1000;
public void processWithRetry(ConsumerRecord<String, String> record) {
int retries = 0;
while (retries < MAX_RETRIES) {
try {
process(record);
return;
} catch (RetryableException e) {
retries++;
if (retries < MAX_RETRIES) {
Thread.sleep(RETRY_BACKOFF_MS * retries);
}
}
}
sendToDeadLetterQueue(record);
}
}Dead Letter Queue
public class DLQHandler {
private KafkaProducer<String, String> dlqProducer;
public void sendToDLQ(ConsumerRecord<String, String> record, Exception error) {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
record.topic() + "-dlq",
record.key(),
record.value()
);
// 메타데이터 추가
dlqRecord.headers()
.add("original-topic", record.topic().getBytes())
.add("original-partition", String.valueOf(record.partition()).getBytes())
.add("original-offset", String.valueOf(record.offset()).getBytes())
.add("error-message", error.getMessage().getBytes())
.add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
dlqProducer.send(dlqRecord);
}
}모니터링
핵심 메트릭
Consumer Group 메트릭:
- Consumer Lag (가장 중요)
- Rebalance 빈도
- 처리율 (records/sec)
- 커밋 지연
JMX 메트릭:
kafka.consumer:type=consumer-coordinator-metrics,client-id=*
├── assigned-partitions
├── rebalance-latency-avg
├── rebalance-rate-per-hour
└── last-rebalance-seconds-ago
알림 설정
alerts:
- name: HighConsumerLag
condition: kafka_consumer_lag > 10000
for: 5m
severity: warning
- name: FrequentRebalances
condition: rebalance_rate_per_hour > 10
for: 1h
severity: warning
- name: NoActiveConsumers
condition: active_consumers == 0
for: 1m
severity: criticalBest Practices
1. 그룹 격리
서비스별로 Consumer Group 분리
- 장애 격리
- 독립적인 스케일링
- 명확한 책임
❌ 여러 서비스가 하나의 그룹 공유
✓ 서비스당 독립된 그룹
2. 처리 보장
// 최소 한 번 보장 (At-least-once)
props.put("enable.auto.commit", "false");
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record); // 처리
}
consumer.commitSync(); // 처리 후 커밋
}3. 우아한 종료
public class GracefulConsumer implements Runnable {
private final AtomicBoolean running = new AtomicBoolean(true);
private KafkaConsumer<String, String> consumer;
public void shutdown() {
running.set(false);
consumer.wakeup(); // poll() 중단
}
@Override
public void run() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 처리
}
} catch (WakeupException e) {
if (running.get()) throw e; // 예상치 못한 wakeup
} finally {
consumer.close();
}
}
}
// Shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.shutdown();
executor.shutdown();
}));4. 설정 체크리스트
□ group.id 명확하게 정의
□ enable.auto.commit 의도에 맞게 설정
□ session.timeout.ms 적절히 설정
□ max.poll.records 처리 용량에 맞게
□ partition.assignment.strategy 선택
□ Static Membership 검토 (Kubernetes)
□ 에러 처리 전략 수립
□ 모니터링 설정
댓글 (0)