Consumer Group 설계

효과적인 Consumer Group 설계는 Kafka 기반 시스템의 확장성과 안정성에 핵심적입니다.

Consumer Group 기본

개념

┌─────────────────────────────────────────────────────────────────┐
│                    Consumer Group Concept                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Topic: orders (6 partitions)                                   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ Consumer Group: order-processor                          │   │
│  │                                                          │   │
│  │  Consumer 1   Consumer 2   Consumer 3                    │   │
│  │  [P0, P1]     [P2, P3]     [P4, P5]                      │   │
│  │                                                          │   │
│  │  각 Consumer가 고유한 파티션을 담당                       │   │
│  │  같은 메시지는 그룹 내에서 한 번만 처리됨                 │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ Consumer Group: analytics                                │   │
│  │                                                          │   │
│  │  Consumer A                                              │   │
│  │  [P0, P1, P2, P3, P4, P5]                                │   │
│  │                                                          │   │
│  │  별도 그룹으로 동일 메시지를 독립적으로 소비               │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

핵심 규칙

1. 파티션 : Consumer = N : 1
   - 하나의 파티션은 그룹 내 하나의 Consumer만 소비
   - 하나의 Consumer는 여러 파티션 소비 가능

2. Consumer 수 ≤ 파티션 수
   - 초과 Consumer는 유휴 상태

3. 다른 그룹은 독립적
   - 동일 토픽을 여러 그룹이 각자 소비 가능

Consumer Group 설계 패턴

패턴 1: 단일 처리 그룹

사용 사례: 단순 이벤트 처리

┌─────────────────────────────────────────────────────────────────┐
│                                                                 │
│  Topic: events                                                  │
│                     │                                           │
│                     ▼                                           │
│           ┌─────────────────┐                                   │
│           │ Consumer Group  │                                   │
│           │ event-processor │                                   │
│           └─────────────────┘                                   │
│                     │                                           │
│                     ▼                                           │
│              Event Handler                                      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
// 설정
Properties props = new Properties();
props.put("group.id", "event-processor");
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.auto.commit", "false");
 
KafkaConsumer<String, Event> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("events"));

패턴 2: 다중 구독 그룹

사용 사례: 여러 토픽을 하나의 그룹이 처리

┌─────────────────────────────────────────────────────────────────┐
│                                                                 │
│  ┌───────────┐ ┌───────────┐ ┌───────────┐                     │
│  │  orders   │ │ payments  │ │ shipments │                     │
│  └───────────┘ └───────────┘ └───────────┘                     │
│        │             │             │                            │
│        └─────────────┼─────────────┘                            │
│                      ▼                                          │
│           ┌─────────────────────┐                               │
│           │   Consumer Group    │                               │
│           │ order-fulfillment   │                               │
│           └─────────────────────┘                               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
// 여러 토픽 구독
consumer.subscribe(Arrays.asList("orders", "payments", "shipments"));
 
// 또는 패턴 매칭
consumer.subscribe(Pattern.compile("order-.*"));

패턴 3: 팬아웃 (Fan-out)

사용 사례: 동일 메시지를 여러 시스템에서 독립 처리

┌─────────────────────────────────────────────────────────────────┐
│                                                                 │
│                    Topic: order-created                         │
│                           │                                     │
│           ┌───────────────┼───────────────┐                     │
│           │               │               │                     │
│           ▼               ▼               ▼                     │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐               │
│  │  inventory  │ │  shipping   │ │  analytics  │               │
│  │  -service   │ │  -service   │ │  -service   │               │
│  └─────────────┘ └─────────────┘ └─────────────┘               │
│                                                                 │
│  각 서비스는 별도 Consumer Group으로 독립 소비                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
// inventory-service
props.put("group.id", "inventory-service");
 
// shipping-service
props.put("group.id", "shipping-service");
 
// analytics-service
props.put("group.id", "analytics-service");

패턴 4: 파이프라인

사용 사례: 단계별 데이터 처리

┌─────────────────────────────────────────────────────────────────┐
│                                                                 │
│  raw-events → enricher → enriched-events → processor → results │
│                                                                 │
│  ┌─────────┐   ┌───────────────┐   ┌─────────────────┐         │
│  │raw-     │   │  enricher     │   │ enriched-       │         │
│  │events   │──▶│  group.id=    │──▶│ events          │         │
│  │         │   │  enricher     │   │                 │         │
│  └─────────┘   └───────────────┘   └─────────────────┘         │
│                                            │                    │
│                                            ▼                    │
│                                    ┌───────────────┐           │
│                                    │  processor    │           │
│                                    │  group.id=    │           │
│                                    │  processor    │           │
│                                    └───────────────┘           │
│                                            │                    │
│                                            ▼                    │
│                                    ┌───────────────┐           │
│                                    │   results     │           │
│                                    └───────────────┘           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

패턴 5: 실패 처리 분리

사용 사례: 메인 처리와 재시도 분리

┌─────────────────────────────────────────────────────────────────┐
│                                                                 │
│  Topic: orders                                                  │
│        │                                                        │
│        ▼                                                        │
│  ┌───────────────┐     실패     ┌───────────────┐              │
│  │ order-        │ ─────────▶  │ orders-dlq    │              │
│  │ processor     │             └───────────────┘              │
│  │ group.id=     │                    │                        │
│  │ order-main    │                    ▼                        │
│  └───────────────┘             ┌───────────────┐              │
│        │                       │ dlq-processor │              │
│        │ 성공                   │ group.id=     │              │
│        ▼                       │ order-dlq     │              │
│  DB 저장                        └───────────────┘              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Consumer Group 명명 규칙

권장 패턴

형식: {서비스명}-{기능}-{환경}

예시:
- order-service-processor-prod
- payment-gateway-validator-staging
- analytics-etl-daily-prod

또는 단순화:
- {서비스명}-{기능}
- order-processor
- payment-validator

피해야 할 패턴

❌ 너무 일반적
- consumer1
- my-group
- test

❌ 환경 정보 누락 (멀티 환경)
- order-processor (dev? prod?)

❌ 버전 포함 (변경 어려움)
- order-processor-v1

Consumer Group 크기 결정

계산 공식

최적 Consumer 수 = min(
    파티션 수,
    필요 처리량 / Consumer당 처리량,
    가용 리소스로 운영 가능한 인스턴스 수
)

예시:
- 파티션 수: 12
- 필요 처리량: 10,000 msg/s
- Consumer당 처리량: 2,000 msg/s
- 가용 인스턴스: 8

Consumer 수 = min(12, 10000/2000, 8) = min(12, 5, 8) = 5
권장: 6개 (여유 고려)

확장 가이드라인

┌─────────────────────────────────────────────────────────────────┐
│                    Scaling Guidelines                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Lag 증가 감지                                                   │
│       │                                                         │
│       ▼                                                         │
│  Consumer 수 < 파티션 수?                                        │
│       │                                                         │
│    YES │ NO                                                     │
│       │   │                                                     │
│       ▼   ▼                                                     │
│  Consumer │ 파티션 수                                            │
│  추가     │ 증가 검토                                            │
│           │                                                     │
│           ▼                                                     │
│       처리 로직                                                  │
│       최적화 검토                                                │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Rebalance 최적화

Static Membership

// 안정적인 Consumer ID로 불필요한 Rebalance 방지
props.put("group.instance.id", "consumer-" + hostname);
props.put("session.timeout.ms", "300000");  // 5분

Cooperative Rebalancing

// 점진적 Rebalance로 다운타임 최소화
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

Rebalance 리스너

consumer.subscribe(topics, new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 현재 처리 중인 작업 완료
        // Offset 커밋
        commitOffsets();
    }
 
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 새 파티션 처리 준비
        // 필요시 Offset 조회
        loadOffsets(partitions);
    }
});

에러 처리 전략

재시도 전략

public class RetryingConsumer {
    private static final int MAX_RETRIES = 3;
    private static final long RETRY_BACKOFF_MS = 1000;
 
    public void processWithRetry(ConsumerRecord<String, String> record) {
        int retries = 0;
        while (retries < MAX_RETRIES) {
            try {
                process(record);
                return;
            } catch (RetryableException e) {
                retries++;
                if (retries < MAX_RETRIES) {
                    Thread.sleep(RETRY_BACKOFF_MS * retries);
                }
            }
        }
        sendToDeadLetterQueue(record);
    }
}

Dead Letter Queue

public class DLQHandler {
    private KafkaProducer<String, String> dlqProducer;
 
    public void sendToDLQ(ConsumerRecord<String, String> record, Exception error) {
        ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
            record.topic() + "-dlq",
            record.key(),
            record.value()
        );
 
        // 메타데이터 추가
        dlqRecord.headers()
            .add("original-topic", record.topic().getBytes())
            .add("original-partition", String.valueOf(record.partition()).getBytes())
            .add("original-offset", String.valueOf(record.offset()).getBytes())
            .add("error-message", error.getMessage().getBytes())
            .add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
 
        dlqProducer.send(dlqRecord);
    }
}

모니터링

핵심 메트릭

Consumer Group 메트릭:
- Consumer Lag (가장 중요)
- Rebalance 빈도
- 처리율 (records/sec)
- 커밋 지연

JMX 메트릭:
kafka.consumer:type=consumer-coordinator-metrics,client-id=*
├── assigned-partitions
├── rebalance-latency-avg
├── rebalance-rate-per-hour
└── last-rebalance-seconds-ago

알림 설정

alerts:
  - name: HighConsumerLag
    condition: kafka_consumer_lag > 10000
    for: 5m
    severity: warning
 
  - name: FrequentRebalances
    condition: rebalance_rate_per_hour > 10
    for: 1h
    severity: warning
 
  - name: NoActiveConsumers
    condition: active_consumers == 0
    for: 1m
    severity: critical

Best Practices

1. 그룹 격리

서비스별로 Consumer Group 분리
- 장애 격리
- 독립적인 스케일링
- 명확한 책임

❌ 여러 서비스가 하나의 그룹 공유
✓ 서비스당 독립된 그룹

2. 처리 보장

// 최소 한 번 보장 (At-least-once)
props.put("enable.auto.commit", "false");
 
while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);  // 처리
    }
    consumer.commitSync();  // 처리 후 커밋
}

3. 우아한 종료

public class GracefulConsumer implements Runnable {
    private final AtomicBoolean running = new AtomicBoolean(true);
    private KafkaConsumer<String, String> consumer;
 
    public void shutdown() {
        running.set(false);
        consumer.wakeup();  // poll() 중단
    }
 
    @Override
    public void run() {
        try {
            while (running.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                // 처리
            }
        } catch (WakeupException e) {
            if (running.get()) throw e;  // 예상치 못한 wakeup
        } finally {
            consumer.close();
        }
    }
}
 
// Shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    consumer.shutdown();
    executor.shutdown();
}));

4. 설정 체크리스트

□ group.id 명확하게 정의
□ enable.auto.commit 의도에 맞게 설정
□ session.timeout.ms 적절히 설정
□ max.poll.records 처리 용량에 맞게
□ partition.assignment.strategy 선택
□ Static Membership 검토 (Kubernetes)
□ 에러 처리 전략 수립
□ 모니터링 설정

관련 문서