Batch Processing
Kafka는 배치 처리를 통해 네트워크 오버헤드를 줄이고 처리량을 극대화합니다.
배치 처리 개념
개별 전송 vs 배치 전송
개별 전송 (비효율적):
┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐
│M1 │→│M2 │→│M3 │→│M4 │→│M5 │→ Broker
└───┘ └───┘ └───┘ └───┘ └───┘
↑ ↑ ↑ ↑ ↑
5번의 네트워크 왕복
배치 전송 (효율적):
┌─────────────────────────┐
│ M1 │ M2 │ M3 │ M4 │ M5 │→ Broker
└─────────────────────────┘
↑
1번의 네트워크 왕복
배치의 장점
| 항목 | 개별 전송 | 배치 전송 |
|---|---|---|
| 네트워크 오버헤드 | 높음 | 낮음 |
| 처리량 | 낮음 | 높음 |
| 압축 효율 | 낮음 | 높음 |
| CPU 사용 | 높음 | 낮음 |
| 레이턴시 | 낮음 | 약간 높음 |
Producer 배치 처리
Record Accumulator
┌─────────────────────────────────────────────────────────────────┐
│ Record Accumulator │
├─────────────────────────────────────────────────────────────────┤
│ │
│ send() │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Topic-Partition Queues │ │
│ │ │ │
│ │ topic-0: [Batch 1] → [Batch 2] → [Batch 3 (filling)] │ │
│ │ topic-1: [Batch 1] → [Batch 2 (filling)] │ │
│ │ topic-2: [Batch 1 (filling)] │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Sender Thread │
│ │ │
│ ▼ │
│ Broker │
│ │
└─────────────────────────────────────────────────────────────────┘
배치 전송 조건
// 배치가 전송되는 조건:
// 1. batch.size 도달
// 2. linger.ms 경과
// 3. buffer.memory 부족
props.put("batch.size", "16384"); // 16KB 배치
props.put("linger.ms", "5"); // 최대 5ms 대기
props.put("buffer.memory", "33554432"); // 32MB 버퍼배치 크기 설정 (batch.size)
batch.size = 16384 (16KB, 기본값)
작은 값 (1KB):
├── 장점: 낮은 레이턴시
├── 단점: 높은 네트워크 오버헤드
└── 적합: 실시간 요구사항
큰 값 (1MB):
├── 장점: 높은 처리량, 좋은 압축률
├── 단점: 높은 레이턴시, 메모리 사용
└── 적합: 배치 처리, 로그 수집
대기 시간 설정 (linger.ms)
linger.ms = 0 (기본값, 즉시 전송)
linger.ms = 0:
┌───┐
│ M │→ 즉시 전송 (배치가 작아도)
└───┘
linger.ms = 10:
┌───┐ 10ms 대기 ┌─────────────┐
│ M │ ─────────────────>│ M │ M │ M │→ 배치 전송
└───┘ (더 많은 메시지 └─────────────┘
수집)
설정 조합 예시
// 높은 처리량 (로그 수집)
props.put("batch.size", "65536"); // 64KB
props.put("linger.ms", "50"); // 50ms 대기
props.put("compression.type", "lz4");
// 낮은 레이턴시 (실시간 이벤트)
props.put("batch.size", "16384"); // 16KB
props.put("linger.ms", "0"); // 즉시 전송
// 균형잡힌 설정
props.put("batch.size", "32768"); // 32KB
props.put("linger.ms", "5"); // 5ms 대기Consumer 배치 처리
Fetch 요청 배치
// Consumer는 한 번의 poll()로 여러 메시지 수신
props.put("fetch.min.bytes", "1"); // 최소 fetch 크기
props.put("fetch.max.bytes", "52428800"); // 최대 fetch 크기 (50MB)
props.put("max.poll.records", "500"); // poll당 최대 레코드 수
props.put("fetch.max.wait.ms", "500"); // 최대 대기 시간Fetch 동작
┌─────────────────────────────────────────────────────────────────┐
│ Consumer Fetch │
├─────────────────────────────────────────────────────────────────┤
│ │
│ poll() 호출 │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Fetch Request │ │
│ │ - 여러 파티션의 데이터 요청 │ │
│ │ - fetch.min.bytes 만큼 대기 또는 │ │
│ │ - fetch.max.wait.ms까지 대기 │ │
│ └─────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Fetch Response │ │
│ │ - 여러 파티션의 배치 데이터 │ │
│ │ - max.poll.records까지 반환 │ │
│ └─────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
배치 처리 패턴
while (running) {
// 배치로 메시지 수신
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 배치 단위로 처리
if (!records.isEmpty()) {
processBatch(records);
consumer.commitSync();
}
}
// 효율적인 배치 처리
private void processBatch(ConsumerRecords<String, String> records) {
// 1. 배치 전처리
List<ProcessedRecord> batch = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
batch.add(preprocess(record));
}
// 2. 배치 DB 쓰기
database.batchInsert(batch);
// 3. 배치 외부 API 호출
externalApi.batchProcess(batch);
}Broker 배치 처리
Request Batching
┌─────────────────────────────────────────────────────────────────┐
│ Broker Request Handling │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Producer Request: │
│ ┌─────────────────────────────────────────┐ │
│ │ Produce Request │ │
│ │ ├── topic-0, partition-0: [batch] │ │
│ │ ├── topic-0, partition-1: [batch] │ │
│ │ └── topic-1, partition-0: [batch] │ │
│ └─────────────────────────────────────────┘ │
│ │
│ → 여러 파티션의 배치를 한 번의 요청으로 처리 │
│ │
└─────────────────────────────────────────────────────────────────┘
Log Append 배치
파티션 로그에 배치 단위로 기록:
┌─────────────────────────────────────────┐
│ .log 파일 │
├─────────────────────────────────────────┤
│ [Record Batch 1] [Record Batch 2] ... │
│ (여러 메시지) (여러 메시지) │
└─────────────────────────────────────────┘
한 번의 쓰기 연산으로 여러 메시지 저장
배치와 압축
배치 압축의 효율성
개별 메시지 압축:
┌───┐ → 압축 → ┌───┐ (오버헤드 높음)
│ M │ │ C │
└───┘ └───┘
배치 압축:
┌─────────────────┐ → 압축 → ┌───────┐ (효율적)
│ M │ M │ M │ M │ │ C │
└─────────────────┘ └───────┘
압축률 비교
테스트 데이터: JSON 로그 1000건
개별 압축:
- 원본: 1,000 KB
- 압축 후: 400 KB (60% 압축률)
배치 압축:
- 원본: 1,000 KB
- 압축 후: 150 KB (85% 압축률)
성능 튜닝
Producer 최적화
Properties props = new Properties();
// 고처리량 설정
props.put("batch.size", "131072"); // 128KB
props.put("linger.ms", "20"); // 20ms 대기
props.put("buffer.memory", "67108864"); // 64MB 버퍼
props.put("compression.type", "lz4");
props.put("acks", "1"); // 빠른 ACK
// 모니터링 메트릭
// batch-size-avg: 평균 배치 크기
// records-per-request-avg: 요청당 레코드 수
// record-queue-time-avg: 배치 대기 시간Consumer 최적화
Properties props = new Properties();
// 고처리량 설정
props.put("fetch.min.bytes", "1048576"); // 1MB 최소 fetch
props.put("fetch.max.wait.ms", "500"); // 500ms 대기
props.put("max.poll.records", "1000"); // 1000개씩 처리
props.put("fetch.max.bytes", "104857600"); // 100MB 최대 fetch처리량 vs 레이턴시 트레이드오프
┌─────────────────────────────────────────────────────────────────┐
│ Throughput vs Latency Trade-off │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Throughput │
│ ▲ │
│ │ ┌───────────────── │
│ │ ┌───┘ │
│ │ ┌───┘ │
│ │ ┌───┘ │
│ │ ┌───┘ │
│ │────┘ │
│ └──────────────────────────────────────────────▶ Latency │
│ │ │
│ │ │
│ 작은 배치, 균형점 큰 배치, │
│ linger.ms=0 linger.ms=50 │
│ │
└─────────────────────────────────────────────────────────────────┘
모니터링
JMX 메트릭
Producer 메트릭:
kafka.producer:type=producer-metrics,client-id={client-id}
├── batch-size-avg # 평균 배치 크기 (bytes)
├── batch-size-max # 최대 배치 크기
├── records-per-request-avg # 요청당 평균 레코드 수
├── record-queue-time-avg # 배치 대기 평균 시간 (ms)
└── record-send-rate # 초당 전송 레코드 수
Consumer 메트릭:
kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id}
├── records-per-request-avg # 요청당 평균 레코드 수
├── fetch-size-avg # 평균 fetch 크기 (bytes)
└── records-consumed-rate # 초당 소비 레코드 수
최적 배치 크기 찾기
// 점진적으로 조정하며 모니터링
// 1. batch-size-avg가 batch.size에 가까운가?
// 2. record-queue-time-avg가 linger.ms에 가까운가?
// 3. 처리량과 레이턴시 목표 달성?
// 배치가 가득 차기 전에 전송되면:
// - linger.ms가 너무 짧음
// - 처리량 향상 여지 있음
// 배치가 항상 가득 차면:
// - batch.size 증가 고려
// - buffer.memory 충분한지 확인
댓글 (0)