Batch Processing

Kafka는 배치 처리를 통해 네트워크 오버헤드를 줄이고 처리량을 극대화합니다.

배치 처리 개념

개별 전송 vs 배치 전송

개별 전송 (비효율적):
┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐
│M1 │→│M2 │→│M3 │→│M4 │→│M5 │→ Broker
└───┘ └───┘ └───┘ └───┘ └───┘
  ↑     ↑     ↑     ↑     ↑
  5번의 네트워크 왕복

배치 전송 (효율적):
┌─────────────────────────┐
│ M1 │ M2 │ M3 │ M4 │ M5 │→ Broker
└─────────────────────────┘
              ↑
        1번의 네트워크 왕복

배치의 장점

항목개별 전송배치 전송
네트워크 오버헤드높음낮음
처리량낮음높음
압축 효율낮음높음
CPU 사용높음낮음
레이턴시낮음약간 높음

Producer 배치 처리

Record Accumulator

┌─────────────────────────────────────────────────────────────────┐
│                     Record Accumulator                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  send()                                                         │
│    │                                                            │
│    ▼                                                            │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ Topic-Partition Queues                                   │   │
│  │                                                          │   │
│  │  topic-0: [Batch 1] → [Batch 2] → [Batch 3 (filling)]   │   │
│  │  topic-1: [Batch 1] → [Batch 2 (filling)]               │   │
│  │  topic-2: [Batch 1 (filling)]                           │   │
│  │                                                          │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                  │
│                              ▼                                  │
│                        Sender Thread                            │
│                              │                                  │
│                              ▼                                  │
│                          Broker                                 │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

배치 전송 조건

// 배치가 전송되는 조건:
// 1. batch.size 도달
// 2. linger.ms 경과
// 3. buffer.memory 부족
 
props.put("batch.size", "16384");       // 16KB 배치
props.put("linger.ms", "5");            // 최대 5ms 대기
props.put("buffer.memory", "33554432"); // 32MB 버퍼

배치 크기 설정 (batch.size)

batch.size = 16384 (16KB, 기본값)

작은 값 (1KB):
├── 장점: 낮은 레이턴시
├── 단점: 높은 네트워크 오버헤드
└── 적합: 실시간 요구사항

큰 값 (1MB):
├── 장점: 높은 처리량, 좋은 압축률
├── 단점: 높은 레이턴시, 메모리 사용
└── 적합: 배치 처리, 로그 수집

대기 시간 설정 (linger.ms)

linger.ms = 0 (기본값, 즉시 전송)

linger.ms = 0:
┌───┐
│ M │→ 즉시 전송 (배치가 작아도)
└───┘

linger.ms = 10:
┌───┐     10ms 대기     ┌─────────────┐
│ M │ ─────────────────>│ M │ M │ M │→ 배치 전송
└───┘   (더 많은 메시지  └─────────────┘
         수집)

설정 조합 예시

// 높은 처리량 (로그 수집)
props.put("batch.size", "65536");    // 64KB
props.put("linger.ms", "50");        // 50ms 대기
props.put("compression.type", "lz4");
 
// 낮은 레이턴시 (실시간 이벤트)
props.put("batch.size", "16384");    // 16KB
props.put("linger.ms", "0");         // 즉시 전송
 
// 균형잡힌 설정
props.put("batch.size", "32768");    // 32KB
props.put("linger.ms", "5");         // 5ms 대기

Consumer 배치 처리

Fetch 요청 배치

// Consumer는 한 번의 poll()로 여러 메시지 수신
props.put("fetch.min.bytes", "1");        // 최소 fetch 크기
props.put("fetch.max.bytes", "52428800"); // 최대 fetch 크기 (50MB)
props.put("max.poll.records", "500");     // poll당 최대 레코드 수
props.put("fetch.max.wait.ms", "500");    // 최대 대기 시간

Fetch 동작

┌─────────────────────────────────────────────────────────────────┐
│                      Consumer Fetch                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  poll() 호출                                                    │
│      │                                                          │
│      ▼                                                          │
│  ┌─────────────────────────────────────────┐                   │
│  │ Fetch Request                            │                   │
│  │ - 여러 파티션의 데이터 요청               │                   │
│  │ - fetch.min.bytes 만큼 대기 또는          │                   │
│  │ - fetch.max.wait.ms까지 대기             │                   │
│  └─────────────────────────────────────────┘                   │
│      │                                                          │
│      ▼                                                          │
│  ┌─────────────────────────────────────────┐                   │
│  │ Fetch Response                           │                   │
│  │ - 여러 파티션의 배치 데이터               │                   │
│  │ - max.poll.records까지 반환              │                   │
│  └─────────────────────────────────────────┘                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

배치 처리 패턴

while (running) {
    // 배치로 메시지 수신
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
    // 배치 단위로 처리
    if (!records.isEmpty()) {
        processBatch(records);
        consumer.commitSync();
    }
}
 
// 효율적인 배치 처리
private void processBatch(ConsumerRecords<String, String> records) {
    // 1. 배치 전처리
    List<ProcessedRecord> batch = new ArrayList<>();
    for (ConsumerRecord<String, String> record : records) {
        batch.add(preprocess(record));
    }
 
    // 2. 배치 DB 쓰기
    database.batchInsert(batch);
 
    // 3. 배치 외부 API 호출
    externalApi.batchProcess(batch);
}

Broker 배치 처리

Request Batching

┌─────────────────────────────────────────────────────────────────┐
│                    Broker Request Handling                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Producer Request:                                              │
│  ┌─────────────────────────────────────────┐                   │
│  │ Produce Request                          │                   │
│  │ ├── topic-0, partition-0: [batch]       │                   │
│  │ ├── topic-0, partition-1: [batch]       │                   │
│  │ └── topic-1, partition-0: [batch]       │                   │
│  └─────────────────────────────────────────┘                   │
│                                                                 │
│  → 여러 파티션의 배치를 한 번의 요청으로 처리                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Log Append 배치

파티션 로그에 배치 단위로 기록:

┌─────────────────────────────────────────┐
│ .log 파일                                │
├─────────────────────────────────────────┤
│ [Record Batch 1] [Record Batch 2] ...   │
│ (여러 메시지)      (여러 메시지)          │
└─────────────────────────────────────────┘

한 번의 쓰기 연산으로 여러 메시지 저장

배치와 압축

배치 압축의 효율성

개별 메시지 압축:
┌───┐ → 압축 → ┌───┐ (오버헤드 높음)
│ M │         │ C │
└───┘         └───┘

배치 압축:
┌─────────────────┐ → 압축 → ┌───────┐ (효율적)
│ M │ M │ M │ M │         │   C   │
└─────────────────┘         └───────┘

압축률 비교

테스트 데이터: JSON 로그 1000건

개별 압축:
- 원본: 1,000 KB
- 압축 후: 400 KB (60% 압축률)

배치 압축:
- 원본: 1,000 KB
- 압축 후: 150 KB (85% 압축률)

성능 튜닝

Producer 최적화

Properties props = new Properties();
 
// 고처리량 설정
props.put("batch.size", "131072");        // 128KB
props.put("linger.ms", "20");             // 20ms 대기
props.put("buffer.memory", "67108864");   // 64MB 버퍼
props.put("compression.type", "lz4");
props.put("acks", "1");                   // 빠른 ACK
 
// 모니터링 메트릭
// batch-size-avg: 평균 배치 크기
// records-per-request-avg: 요청당 레코드 수
// record-queue-time-avg: 배치 대기 시간

Consumer 최적화

Properties props = new Properties();
 
// 고처리량 설정
props.put("fetch.min.bytes", "1048576");  // 1MB 최소 fetch
props.put("fetch.max.wait.ms", "500");    // 500ms 대기
props.put("max.poll.records", "1000");    // 1000개씩 처리
props.put("fetch.max.bytes", "104857600"); // 100MB 최대 fetch

처리량 vs 레이턴시 트레이드오프

┌─────────────────────────────────────────────────────────────────┐
│              Throughput vs Latency Trade-off                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Throughput                                                     │
│      ▲                                                          │
│      │                    ┌─────────────────                    │
│      │                ┌───┘                                     │
│      │            ┌───┘                                         │
│      │        ┌───┘                                             │
│      │    ┌───┘                                                 │
│      │────┘                                                     │
│      └──────────────────────────────────────────────▶ Latency   │
│           │                                                     │
│           │                                                     │
│    작은 배치,        균형점           큰 배치,                   │
│    linger.ms=0                      linger.ms=50               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

모니터링

JMX 메트릭

Producer 메트릭:
kafka.producer:type=producer-metrics,client-id={client-id}
├── batch-size-avg          # 평균 배치 크기 (bytes)
├── batch-size-max          # 최대 배치 크기
├── records-per-request-avg # 요청당 평균 레코드 수
├── record-queue-time-avg   # 배치 대기 평균 시간 (ms)
└── record-send-rate        # 초당 전송 레코드 수

Consumer 메트릭:
kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id}
├── records-per-request-avg # 요청당 평균 레코드 수
├── fetch-size-avg          # 평균 fetch 크기 (bytes)
└── records-consumed-rate   # 초당 소비 레코드 수

최적 배치 크기 찾기

// 점진적으로 조정하며 모니터링
// 1. batch-size-avg가 batch.size에 가까운가?
// 2. record-queue-time-avg가 linger.ms에 가까운가?
// 3. 처리량과 레이턴시 목표 달성?
 
// 배치가 가득 차기 전에 전송되면:
// - linger.ms가 너무 짧음
// - 처리량 향상 여지 있음
 
// 배치가 항상 가득 차면:
// - batch.size 증가 고려
// - buffer.memory 충분한지 확인

관련 문서