Idempotent Producer (멱등성 프로듀서)

Idempotent Producer는 중복 메시지를 자동으로 방지하는 기능으로, Kafka 0.11부터 도입되었습니다.

멱등성(Idempotence)이란?

멱등성은 동일한 연산을 여러 번 수행해도 결과가 같은 성질을 의미합니다.

멱등성이 없는 경우의 문제

Producer → [메시지] → Broker → ACK 손실
         ↓ (재시도)
         → [메시지] → Broker (중복 저장!)

중복 발생 시나리오

  1. Producer가 메시지를 Broker로 전송
  2. Broker가 메시지를 저장하고 ACK 전송
  3. 네트워크 오류로 ACK가 Producer에 도달하지 못함
  4. Producer가 타임아웃으로 재시도
  5. Broker가 동일한 메시지를 다시 저장 (중복!)

멱등성이 있는 경우

Producer → [메시지(PID:1, Seq:0)] → Broker → ACK 손실
         ↓ (재시도)
         → [메시지(PID:1, Seq:0)] → Broker (중복 감지, 무시)
                                    ↓
                                   ACK (성공)

Broker가 이미 처리한 메시지를 감지하고 중복을 방지합니다.

Idempotent Producer 활성화

설정

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
// 멱등성 활성화
props.put("enable.idempotence", true);
 
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

기본 설정 (Kafka 3.0+)

Kafka 3.0부터는 기본적으로 활성화되어 있습니다:

// Kafka 3.0+에서는 명시적 설정 불필요
// enable.idempotence=true가 기본값

자동 적용되는 설정

enable.idempotence=true를 설정하면 다음 설정이 자동으로 적용됩니다:

acks = all  // 모든 ISR 레플리카의 확인 필요
retries = Integer.MAX_VALUE  // 무제한 재시도
max.in.flight.requests.per.connection <= 5  // 최대 5개 요청

만약 이 설정들과 충돌하는 값을 지정하면 에러가 발생합니다:

// ✗ 에러 발생!
props.put("enable.idempotence", true);
props.put("acks", "1");  // idempotence는 acks=all 필요

동작 원리

1. Producer ID (PID)

각 Producer 인스턴스는 고유한 Producer ID를 받습니다:

Producer 시작 → Broker에게 PID 요청 → PID 할당 (예: 12345)
  • PID는 Producer 세션 동안 유효
  • Producer를 재시작하면 새로운 PID 할당
  • 내부적으로만 사용되며 사용자에게 노출되지 않음

2. Sequence Number

각 메시지는 파티션별로 순차적인 시퀀스 번호를 받습니다:

Partition 0: Msg(PID:1, Seq:0), Msg(PID:1, Seq:1), Msg(PID:1, Seq:2), ...
Partition 1: Msg(PID:1, Seq:0), Msg(PID:1, Seq:1), Msg(PID:1, Seq:2), ...
  • 각 파티션마다 독립적인 시퀀스 번호
  • 0부터 시작하여 순차 증가
  • Integer 범위 초과 시 0으로 리셋

3. 중복 감지 메커니즘

Broker는 각 파티션에 대해 (PID, Sequence Number) 매핑을 유지합니다:

// Broker의 내부 상태
Map<(ProducerId, Partition), LastSequenceNumber>
 
예시:
(PID:1, Partition:0) → 5  // 마지막으로 저장된 시퀀스 번호

메시지 처리 로직

새 메시지 도착: (PID:1, Partition:0, Seq:6)
                    ↓
브로커 확인: 현재 상태 = (PID:1, Partition:0) → 5
                    ↓
           Seq == LastSeq + 1 ?
          /                    \
        Yes                    No
         ↓                      ↓
      저장 및                 중복/순서 오류
      상태 업데이트              ↓
         ↓                  Seq == LastSeq ?
      Success               /              \
                          Yes              No
                           ↓                ↓
                      중복 무시         순서 오류
                     (ACK 전송)       (에러 반환)

4. 예제: 중복 방지 동작

Timeline:
T1: Producer → Msg(PID:1, Seq:0) → Broker
T2: Broker 저장 성공, 상태 업데이트: (PID:1, P:0) → 0
T3: Broker → ACK → Producer (네트워크 손실!)
T4: Producer 타임아웃, 재시도
T5: Producer → Msg(PID:1, Seq:0) → Broker
T6: Broker 확인: Seq(0) == LastSeq(0) → 중복!
T7: Broker → ACK (저장하지 않고 ACK만 전송)
T8: Producer 성공 수신

순서 보장

Idempotent Producer는 파티션 내 순서도 보장합니다.

max.in.flight.requests.per.connection

기본 Producer는 순서를 보장하지 않습니다:

# max.in.flight.requests.per.connection = 5인 경우

Request 1: Msg A → [전송 중...]
Request 2: Msg B → [전송 완료]  // B가 먼저 도착!
Request 3: Msg C → [전송 중...]

결과: B, A, C (순서 깨짐)

Idempotent Producer의 순서 보장

props.put("enable.idempotence", true);
// max.in.flight.requests.per.connection은 자동으로 5 이하로 제한됨

Broker가 시퀀스 번호로 순서를 검증:

Broker 수신: Msg(Seq:1) → 대기
             Msg(Seq:0) → 저장
             Msg(Seq:1) → 저장 (이제 Seq:0 다음이므로)

결과: 순서 보장됨

만약 순서가 맞지 않으면:

Broker 수신: Msg(Seq:2)
현재 상태: LastSeq = 0
예상: Seq = 1
결과: OutOfOrderSequenceException

제한사항

1. Producer 세션 범위

멱등성은 단일 Producer 세션 내에서만 보장됩니다:

// Producer 1
Producer p1 = new KafkaProducer<>(props);  // PID: 100
p1.send(record);  // Seq: 0
p1.close();
 
// Producer 2 (새로운 세션)
Producer p2 = new KafkaProducer<>(props);  // PID: 101 (다름!)
p2.send(record);  // Seq: 0 (새로 시작)
// p1과 p2 사이의 중복은 감지되지 않음!

2. 파티션별 보장

중복 방지는 파티션별로만 보장됩니다:

// 같은 메시지를 다른 파티션으로 전송
record1 = new ProducerRecord<>("topic", 0, "key", "value");
record2 = new ProducerRecord<>("topic", 1, "key", "value");
 
producer.send(record1);
producer.send(record2);
// 서로 다른 파티션이므로 중복으로 간주되지 않음

3. Broker 재시작

Broker가 재시작되면 PID-Sequence 매핑이 일부 손실될 수 있습니다:

Broker 재시작 → 메모리 내 매핑 손실
              → 로그에서 복구 (최근 것만)
              → 일부 중복 감지 불가

4. 토픽 삭제 및 재생성

토픽을 삭제하고 재생성하면 매핑이 초기화됩니다.

성능 영향

처리량

  • 최소: 멱등성으로 인한 오버헤드는 약 3-5%
  • acks=all로 인한 영향이 더 큼 (약 10-20%)

레이턴시

  • PID 할당: 초기 한 번만 (무시할 수 있는 수준)
  • 시퀀스 번호 검증: 매우 빠름 (메모리 조회)

메모리

Broker는 각 (PID, Partition) 조합의 마지막 시퀀스 번호를 메모리에 유지:

메모리 사용량 = 활성 PID 수 × 파티션 수 × 8 bytes (대략)

예시:
- 1000개 Producer
- 10개 파티션
- 약 80KB 메모리

사용 예제

기본 사용

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", true);
 
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
 
ProducerRecord<String, String> record =
    new ProducerRecord<>("my-topic", "key", "value");
 
// 중복이 자동으로 방지됨
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // 에러 처리
        exception.printStackTrace();
    } else {
        System.out.printf("Sent to partition %d at offset %d%n",
            metadata.partition(), metadata.offset());
    }
});
 
producer.close();

재시도와 함께 사용

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
// 멱등성 + 무한 재시도
props.put("enable.idempotence", true);
props.put("retries", Integer.MAX_VALUE);
props.put("delivery.timeout.ms", 120000);  // 2분 이내
 
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
 
// 네트워크 오류가 발생해도 자동 재시도하며 중복 방지
producer.send(record);

Transactional Producer와의 관계

Transactional Producer는 Idempotent Producer를 기반으로 합니다:

Transactional Producer
        ↓
    내부적으로
        ↓
Idempotent Producer 사용

트랜잭션을 사용하면 자동으로 멱등성이 활성화됩니다:

props.put("transactional.id", "my-transactional-id");
// enable.idempotence=true 자동 적용

차이점:

  • Idempotent: 단일 세션, 중복 방지
  • Transactional: 다중 세션, 중복 방지 + 원자성 보장

관련 문서: Transactional Producer

Best Practices

1. 기본적으로 활성화 (Kafka 3.0+)

// Kafka 3.0+에서는 자동 활성화되므로 확인만
// enable.idempotence=true (기본값)

2. 이전 버전에서는 명시적 활성화

// Kafka 3.0 이전 버전
props.put("enable.idempotence", true);

3. Producer 인스턴스 재사용

// ✓ 좋은 예: 하나의 Producer를 재사용
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
 
for (int i = 0; i < 1000; i++) {
    producer.send(new ProducerRecord<>("topic", "key", "value" + i));
}
 
producer.close();
 
// ✗ 나쁜 예: 매번 새로운 Producer 생성
for (int i = 0; i < 1000; i++) {
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<>("topic", "key", "value" + i));
    producer.close();  // 새로운 PID 할당, 중복 감지 안 됨
}

4. 충분한 재시도 설정

props.put("enable.idempotence", true);
props.put("retries", Integer.MAX_VALUE);
props.put("delivery.timeout.ms", 120000);

5. 에러 처리

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        if (exception instanceof OutOfOrderSequenceException) {
            // 순서 오류 - 심각한 문제, 로그 및 알림
            logger.error("Sequence error", exception);
        } else if (exception instanceof ProducerFencedException) {
            // Producer가 펜싱됨 - 종료 필요
            logger.error("Producer fenced", exception);
            producer.close();
        }
    }
});

문제 해결

OutOfOrderSequenceException

원인: 시퀀스 번호가 예상과 다름

해결:

  1. 로그 확인 및 분석
  2. Producer 재시작
  3. 네트워크 안정성 확인
// 에러 로그 예시
org.apache.kafka.common.errors.OutOfOrderSequenceException:
  The broker received an out of order sequence number

ProducerFencedException

원인: 같은 transactional.id를 사용하는 다른 Producer가 활성화됨

해결:

  1. Producer 종료
  2. transactional.id 확인
  3. 중복 Producer 인스턴스 제거

멱등성 vs 트랜잭션 선택

요구사항IdempotentTransactional
중복 방지
순서 보장 (파티션 내)
다중 파티션 원자성
Producer 재시작 후 보장
Read Committed
성능 오버헤드낮음중간

선택 가이드:

  • Idempotent: 대부분의 경우 (기본값)
  • Transactional: Exactly-once semantics 필요 시

관련 문서