Producer 기본 개념

Kafka Producer는 메시지를 Kafka 클러스터의 토픽으로 전송하는 클라이언트 애플리케이션입니다.

Producer의 역할

Producer는 다음과 같은 역할을 수행합니다:

  1. 메시지 생성: 애플리케이션에서 발생한 데이터를 Kafka 메시지로 변환
  2. 직렬화: 메시지의 키와 값을 바이트 배열로 직렬화
  3. 파티션 결정: 메시지가 전송될 파티션 결정
  4. 배치 처리: 성능 향상을 위해 메시지를 배치로 묶음
  5. 전송: Kafka 브로커로 메시지 전송
  6. 재시도: 실패한 전송에 대한 재시도 처리

Producer 아키텍처

Application
    ↓
Producer API
    ↓
Serializer (Key & Value)
    ↓
Partitioner
    ↓
Record Accumulator (Batch)
    ↓
Sender (I/O Thread)
    ↓
Kafka Broker

주요 컴포넌트

1. Serializer (직렬화기)

  • 메시지의 키와 값을 바이트 배열로 변환
  • 기본 제공: StringSerializer, ByteArraySerializer, IntegerSerializer 등
  • 커스텀 직렬화기 구현 가능

2. Partitioner (파티셔너)

  • 메시지가 전송될 파티션을 결정
  • 기본 전략: 키 해싱 또는 라운드 로빈
  • 커스텀 파티셔닝 로직 구현 가능

3. Record Accumulator (레코드 누적기)

  • 메시지를 메모리 버퍼에 배치로 축적
  • 네트워크 호출 횟수를 줄여 성능 향상
  • 버퍼 크기는 buffer.memory 설정으로 제어

4. Sender Thread (전송 스레드)

  • 별도의 I/O 스레드에서 브로커로 메시지 전송
  • 비동기 전송을 통해 애플리케이션 스레드 블로킹 방지

Producer 생성 예제

Java

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
 
// 메시지 전송
ProducerRecord<String, String> record =
    new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);
 
producer.close();

Python (kafka-python)

from kafka import KafkaProducer
 
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    key_serializer=lambda k: k.encode('utf-8'),
    value_serializer=lambda v: v.encode('utf-8')
)
 
# 메시지 전송
producer.send('my-topic', key=b'key', value=b'value')
 
producer.close()

Producer 메시지 구조

ProducerRecord

Producer가 전송하는 메시지는 다음 요소로 구성됩니다:

  • Topic (필수): 메시지를 전송할 토픽 이름
  • Partition (선택): 특정 파티션 지정 (null이면 파티셔너가 결정)
  • Key (선택): 메시지 키 (파티셔닝 및 로그 압축에 사용)
  • Value (필수): 메시지 본문
  • Timestamp (선택): 메시지 타임스탬프 (기본값: 현재 시간)
  • Headers (선택): 메타데이터 헤더
ProducerRecord<String, String> record = new ProducerRecord<>(
    "my-topic",      // topic
    0,               // partition (optional)
    System.currentTimeMillis(), // timestamp (optional)
    "key",           // key (optional)
    "value",         // value
    headers          // headers (optional)
);

Producer 메타데이터

Producer는 클러스터 메타데이터를 주기적으로 갱신합니다:

  • 토픽별 파티션 정보
  • 각 파티션의 리더 브로커
  • 사용 가능한 브로커 목록

메타데이터 갱신 주기는 metadata.max.age.ms 설정으로 제어됩니다.

성능 고려사항

1. 배치 처리

  • 여러 메시지를 묶어서 전송하여 처리량 향상
  • batch.sizelinger.ms 설정 조정

2. 압축

  • 네트워크 대역폭과 스토리지 절약
  • compression.type: none, gzip, snappy, lz4, zstd

3. 비동기 전송

  • 애플리케이션 스레드 블로킹 방지
  • Callback을 통한 전송 결과 처리

4. 버퍼 관리

  • buffer.memory: Producer가 사용할 수 있는 전체 메모리
  • 버퍼가 가득 차면 max.block.ms 동안 블로킹

관련 문서