Producer 기본 개념
Kafka Producer는 메시지를 Kafka 클러스터의 토픽으로 전송하는 클라이언트 애플리케이션입니다.
Producer의 역할
Producer는 다음과 같은 역할을 수행합니다:
- 메시지 생성: 애플리케이션에서 발생한 데이터를 Kafka 메시지로 변환
- 직렬화: 메시지의 키와 값을 바이트 배열로 직렬화
- 파티션 결정: 메시지가 전송될 파티션 결정
- 배치 처리: 성능 향상을 위해 메시지를 배치로 묶음
- 전송: Kafka 브로커로 메시지 전송
- 재시도: 실패한 전송에 대한 재시도 처리
Producer 아키텍처
Application
↓
Producer API
↓
Serializer (Key & Value)
↓
Partitioner
↓
Record Accumulator (Batch)
↓
Sender (I/O Thread)
↓
Kafka Broker
주요 컴포넌트
1. Serializer (직렬화기)
- 메시지의 키와 값을 바이트 배열로 변환
- 기본 제공: StringSerializer, ByteArraySerializer, IntegerSerializer 등
- 커스텀 직렬화기 구현 가능
2. Partitioner (파티셔너)
- 메시지가 전송될 파티션을 결정
- 기본 전략: 키 해싱 또는 라운드 로빈
- 커스텀 파티셔닝 로직 구현 가능
3. Record Accumulator (레코드 누적기)
- 메시지를 메모리 버퍼에 배치로 축적
- 네트워크 호출 횟수를 줄여 성능 향상
- 버퍼 크기는
buffer.memory설정으로 제어
4. Sender Thread (전송 스레드)
- 별도의 I/O 스레드에서 브로커로 메시지 전송
- 비동기 전송을 통해 애플리케이션 스레드 블로킹 방지
Producer 생성 예제
Java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 메시지 전송
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);
producer.close();Python (kafka-python)
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: v.encode('utf-8')
)
# 메시지 전송
producer.send('my-topic', key=b'key', value=b'value')
producer.close()Producer 메시지 구조
ProducerRecord
Producer가 전송하는 메시지는 다음 요소로 구성됩니다:
- Topic (필수): 메시지를 전송할 토픽 이름
- Partition (선택): 특정 파티션 지정 (null이면 파티셔너가 결정)
- Key (선택): 메시지 키 (파티셔닝 및 로그 압축에 사용)
- Value (필수): 메시지 본문
- Timestamp (선택): 메시지 타임스탬프 (기본값: 현재 시간)
- Headers (선택): 메타데이터 헤더
ProducerRecord<String, String> record = new ProducerRecord<>(
"my-topic", // topic
0, // partition (optional)
System.currentTimeMillis(), // timestamp (optional)
"key", // key (optional)
"value", // value
headers // headers (optional)
);Producer 메타데이터
Producer는 클러스터 메타데이터를 주기적으로 갱신합니다:
- 토픽별 파티션 정보
- 각 파티션의 리더 브로커
- 사용 가능한 브로커 목록
메타데이터 갱신 주기는 metadata.max.age.ms 설정으로 제어됩니다.
성능 고려사항
1. 배치 처리
- 여러 메시지를 묶어서 전송하여 처리량 향상
batch.size와linger.ms설정 조정
2. 압축
- 네트워크 대역폭과 스토리지 절약
compression.type: none, gzip, snappy, lz4, zstd
3. 비동기 전송
- 애플리케이션 스레드 블로킹 방지
- Callback을 통한 전송 결과 처리
4. 버퍼 관리
buffer.memory: Producer가 사용할 수 있는 전체 메모리- 버퍼가 가득 차면
max.block.ms동안 블로킹
댓글 (0)