Consumer 기본 개념

Kafka Consumer는 Kafka 클러스터의 토픽에서 메시지를 읽어오는 클라이언트 애플리케이션입니다.

Consumer의 역할

Consumer는 다음과 같은 역할을 수행합니다:

  1. 메시지 구독: 하나 이상의 토픽을 구독
  2. 메시지 폴링: 브로커로부터 메시지를 가져옴
  3. 역직렬화: 바이트 배열을 애플리케이션이 사용할 수 있는 객체로 변환
  4. 오프셋 관리: 읽은 위치를 추적하고 커밋
  5. 메시지 처리: 비즈니스 로직 수행
  6. 그룹 조정: Consumer Group 내에서 파티션 할당 조정

Consumer 아키텍처

Kafka Broker
    ↓
Fetcher (I/O Thread)
    ↓
ConsumerNetworkClient
    ↓
Deserializer (Key & Value)
    ↓
ConsumerRecords
    ↓
Application Processing
    ↓
Offset Commit

주요 컴포넌트

1. Fetcher (페처)

  • 브로커로부터 메시지를 가져오는 역할
  • 백그라운드에서 데이터를 미리 가져와 버퍼에 저장
  • fetch.min.bytes, fetch.max.wait.ms 설정으로 제어

2. Deserializer (역직렬화기)

  • 바이트 배열을 애플리케이션 객체로 변환
  • 기본 제공: StringDeserializer, ByteArrayDeserializer, IntegerDeserializer 등
  • 커스텀 역직렬화기 구현 가능

3. ConsumerCoordinator (컨슈머 코디네이터)

  • Consumer Group 멤버십 관리
  • 파티션 할당 및 리밸런싱 처리
  • Group Coordinator(브로커)와 통신

4. OffsetManager (오프셋 매니저)

  • 오프셋 커밋 및 조회 관리
  • Auto Commit 또는 Manual Commit 처리

Consumer 생성 예제

Java

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
 
// 토픽 구독
consumer.subscribe(Arrays.asList("my-topic"));
 
// 메시지 폴링
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",
            record.partition(), record.offset(), record.key(), record.value());
    }
}

Python (kafka-python)

from kafka import KafkaConsumer
 
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-consumer-group',
    key_deserializer=lambda k: k.decode('utf-8') if k else None,
    value_deserializer=lambda v: v.decode('utf-8')
)
 
# 메시지 폴링
for message in consumer:
    print(f"partition={message.partition}, offset={message.offset}, "
          f"key={message.key}, value={message.value}")

Consumer 메시지 구조

ConsumerRecord

Consumer가 수신하는 메시지는 다음 정보를 포함합니다:

  • Topic: 메시지가 속한 토픽 이름
  • Partition: 메시지가 속한 파티션 번호
  • Offset: 파티션 내 메시지의 고유 위치
  • Key: 메시지 키 (null 가능)
  • Value: 메시지 본문
  • Timestamp: 메시지 타임스탬프
  • Headers: 메타데이터 헤더
ConsumerRecord<String, String> record = ...;
 
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
long timestamp = record.timestamp();
Headers headers = record.headers();

구독 방식

1. subscribe() - 토픽 기반 구독

// 토픽 이름으로 구독
consumer.subscribe(Arrays.asList("topic1", "topic2"));
 
// 정규식으로 구독
consumer.subscribe(Pattern.compile("topic-.*"));
  • Consumer Group을 통한 자동 파티션 할당
  • 리밸런싱 자동 처리
  • 동적 파티션 할당

2. assign() - 파티션 직접 할당

// 특정 파티션 직접 할당
TopicPartition partition0 = new TopicPartition("my-topic", 0);
TopicPartition partition1 = new TopicPartition("my-topic", 1);
consumer.assign(Arrays.asList(partition0, partition1));
  • Consumer Group 없이 사용 가능
  • 리밸런싱 없음
  • 정밀한 파티션 제어 필요 시 사용

subscribe vs assign 비교

특성subscribeassign
파티션 할당자동수동
Consumer Group필수선택
리밸런싱자동없음
동적 확장지원직접 관리 필요
사용 사례일반적인 경우특수한 경우

Poll 루프

Consumer의 핵심은 poll() 루프입니다:

try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
        for (ConsumerRecord<String, String> record : records) {
            processRecord(record);
        }
    }
} finally {
    consumer.close();
}

poll()의 역할

  1. 브로커로부터 레코드 가져오기
  2. 하트비트 전송 (Consumer Group 멤버십 유지)
  3. 파티션 리밸런싱 트리거
  4. 오프셋 커밋 (auto commit 설정 시)

poll() 타임아웃

  • 지정된 시간 동안 데이터를 기다림
  • 타임아웃 전에 데이터가 있으면 즉시 반환
  • 하트비트와 리밸런싱을 위해 적절한 값 설정 필요
// 너무 긴 타임아웃: 리밸런싱 지연
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(5)); // ❌
 
// 적절한 타임아웃
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // ✓

Consumer 종료

Consumer를 안전하게 종료하는 방법:

// 메인 스레드에서
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    System.out.println("Shutting down...");
    consumer.wakeup();  // poll()을 중단시킴
}));
 
// Consumer 스레드에서
try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        // 처리...
    }
} catch (WakeupException e) {
    // wakeup() 호출로 인한 예외 - 정상적인 종료
} finally {
    consumer.close();  // 리소스 정리 및 Group 탈퇴
}

wakeup() 메서드

  • 다른 스레드에서 안전하게 호출 가능
  • poll()에서 WakeupException 발생
  • Consumer 종료 시 권장되는 방법

스레드 안전성

Kafka Consumer는 스레드 안전하지 않습니다:

  • 하나의 Consumer 인스턴스는 단일 스레드에서만 사용
  • 멀티스레드 처리가 필요하면 여러 Consumer 인스턴스 생성
  • wakeup()만 다른 스레드에서 호출 가능

멀티스레드 처리 패턴

패턴 1: Consumer per Thread

// 각 스레드마다 독립적인 Consumer
for (int i = 0; i < numThreads; i++) {
    new Thread(() -> {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        // 폴링 루프...
    }).start();
}

패턴 2: 처리 로직 분리

// 하나의 Consumer + 워커 스레드 풀
ExecutorService executor = Executors.newFixedThreadPool(10);
 
while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        executor.submit(() -> processRecord(record));
    }
}

성능 고려사항

1. 폴링 주기

  • 너무 짧으면: CPU 사용량 증가
  • 너무 길면: 처리 지연, 리밸런싱 지연

2. 배치 크기

  • max.poll.records: 한 번에 가져올 최대 레코드 수
  • 크면 처리량 증가, 작으면 레이턴시 감소

3. 메모리 관리

  • fetch.max.bytes: 한 번 요청에 가져올 최대 바이트
  • Consumer 메모리 사용량에 영향

4. 처리 시간

  • max.poll.interval.ms 내에 처리 완료 필요
  • 초과 시 Consumer가 그룹에서 제외됨

관련 문서