Consumer 기본 개념
Kafka Consumer는 Kafka 클러스터의 토픽에서 메시지를 읽어오는 클라이언트 애플리케이션입니다.
Consumer의 역할
Consumer는 다음과 같은 역할을 수행합니다:
- 메시지 구독: 하나 이상의 토픽을 구독
- 메시지 폴링: 브로커로부터 메시지를 가져옴
- 역직렬화: 바이트 배열을 애플리케이션이 사용할 수 있는 객체로 변환
- 오프셋 관리: 읽은 위치를 추적하고 커밋
- 메시지 처리: 비즈니스 로직 수행
- 그룹 조정: Consumer Group 내에서 파티션 할당 조정
Consumer 아키텍처
Kafka Broker
↓
Fetcher (I/O Thread)
↓
ConsumerNetworkClient
↓
Deserializer (Key & Value)
↓
ConsumerRecords
↓
Application Processing
↓
Offset Commit
주요 컴포넌트
1. Fetcher (페처)
- 브로커로부터 메시지를 가져오는 역할
- 백그라운드에서 데이터를 미리 가져와 버퍼에 저장
fetch.min.bytes,fetch.max.wait.ms설정으로 제어
2. Deserializer (역직렬화기)
- 바이트 배열을 애플리케이션 객체로 변환
- 기본 제공: StringDeserializer, ByteArrayDeserializer, IntegerDeserializer 등
- 커스텀 역직렬화기 구현 가능
3. ConsumerCoordinator (컨슈머 코디네이터)
- Consumer Group 멤버십 관리
- 파티션 할당 및 리밸런싱 처리
- Group Coordinator(브로커)와 통신
4. OffsetManager (오프셋 매니저)
- 오프셋 커밋 및 조회 관리
- Auto Commit 또는 Manual Commit 처리
Consumer 생성 예제
Java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 토픽 구독
consumer.subscribe(Arrays.asList("my-topic"));
// 메시지 폴링
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}Python (kafka-python)
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='my-consumer-group',
key_deserializer=lambda k: k.decode('utf-8') if k else None,
value_deserializer=lambda v: v.decode('utf-8')
)
# 메시지 폴링
for message in consumer:
print(f"partition={message.partition}, offset={message.offset}, "
f"key={message.key}, value={message.value}")Consumer 메시지 구조
ConsumerRecord
Consumer가 수신하는 메시지는 다음 정보를 포함합니다:
- Topic: 메시지가 속한 토픽 이름
- Partition: 메시지가 속한 파티션 번호
- Offset: 파티션 내 메시지의 고유 위치
- Key: 메시지 키 (null 가능)
- Value: 메시지 본문
- Timestamp: 메시지 타임스탬프
- Headers: 메타데이터 헤더
ConsumerRecord<String, String> record = ...;
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
long timestamp = record.timestamp();
Headers headers = record.headers();구독 방식
1. subscribe() - 토픽 기반 구독
// 토픽 이름으로 구독
consumer.subscribe(Arrays.asList("topic1", "topic2"));
// 정규식으로 구독
consumer.subscribe(Pattern.compile("topic-.*"));- Consumer Group을 통한 자동 파티션 할당
- 리밸런싱 자동 처리
- 동적 파티션 할당
2. assign() - 파티션 직접 할당
// 특정 파티션 직접 할당
TopicPartition partition0 = new TopicPartition("my-topic", 0);
TopicPartition partition1 = new TopicPartition("my-topic", 1);
consumer.assign(Arrays.asList(partition0, partition1));- Consumer Group 없이 사용 가능
- 리밸런싱 없음
- 정밀한 파티션 제어 필요 시 사용
subscribe vs assign 비교
| 특성 | subscribe | assign |
|---|---|---|
| 파티션 할당 | 자동 | 수동 |
| Consumer Group | 필수 | 선택 |
| 리밸런싱 | 자동 | 없음 |
| 동적 확장 | 지원 | 직접 관리 필요 |
| 사용 사례 | 일반적인 경우 | 특수한 경우 |
Poll 루프
Consumer의 핵심은 poll() 루프입니다:
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
}
} finally {
consumer.close();
}poll()의 역할
- 브로커로부터 레코드 가져오기
- 하트비트 전송 (Consumer Group 멤버십 유지)
- 파티션 리밸런싱 트리거
- 오프셋 커밋 (auto commit 설정 시)
poll() 타임아웃
- 지정된 시간 동안 데이터를 기다림
- 타임아웃 전에 데이터가 있으면 즉시 반환
- 하트비트와 리밸런싱을 위해 적절한 값 설정 필요
// 너무 긴 타임아웃: 리밸런싱 지연
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(5)); // ❌
// 적절한 타임아웃
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // ✓Consumer 종료
Consumer를 안전하게 종료하는 방법:
// 메인 스레드에서
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down...");
consumer.wakeup(); // poll()을 중단시킴
}));
// Consumer 스레드에서
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 처리...
}
} catch (WakeupException e) {
// wakeup() 호출로 인한 예외 - 정상적인 종료
} finally {
consumer.close(); // 리소스 정리 및 Group 탈퇴
}wakeup() 메서드
- 다른 스레드에서 안전하게 호출 가능
poll()에서WakeupException발생- Consumer 종료 시 권장되는 방법
스레드 안전성
Kafka Consumer는 스레드 안전하지 않습니다:
- 하나의 Consumer 인스턴스는 단일 스레드에서만 사용
- 멀티스레드 처리가 필요하면 여러 Consumer 인스턴스 생성
wakeup()만 다른 스레드에서 호출 가능
멀티스레드 처리 패턴
패턴 1: Consumer per Thread
// 각 스레드마다 독립적인 Consumer
for (int i = 0; i < numThreads; i++) {
new Thread(() -> {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
// 폴링 루프...
}).start();
}패턴 2: 처리 로직 분리
// 하나의 Consumer + 워커 스레드 풀
ExecutorService executor = Executors.newFixedThreadPool(10);
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> processRecord(record));
}
}성능 고려사항
1. 폴링 주기
- 너무 짧으면: CPU 사용량 증가
- 너무 길면: 처리 지연, 리밸런싱 지연
2. 배치 크기
max.poll.records: 한 번에 가져올 최대 레코드 수- 크면 처리량 증가, 작으면 레이턴시 감소
3. 메모리 관리
fetch.max.bytes: 한 번 요청에 가져올 최대 바이트- Consumer 메모리 사용량에 영향
4. 처리 시간
max.poll.interval.ms내에 처리 완료 필요- 초과 시 Consumer가 그룹에서 제외됨
댓글 (0)