Idempotent Producer (멱등성 프로듀서)
Idempotent Producer는 중복 메시지를 자동으로 방지하는 기능으로, Kafka 0.11부터 도입되었습니다.
멱등성(Idempotence)이란?
멱등성은 동일한 연산을 여러 번 수행해도 결과가 같은 성질을 의미합니다.
멱등성이 없는 경우의 문제
Producer → [메시지] → Broker → ACK 손실
↓ (재시도)
→ [메시지] → Broker (중복 저장!)
중복 발생 시나리오
- Producer가 메시지를 Broker로 전송
- Broker가 메시지를 저장하고 ACK 전송
- 네트워크 오류로 ACK가 Producer에 도달하지 못함
- Producer가 타임아웃으로 재시도
- Broker가 동일한 메시지를 다시 저장 (중복!)
멱등성이 있는 경우
Producer → [메시지(PID:1, Seq:0)] → Broker → ACK 손실
↓ (재시도)
→ [메시지(PID:1, Seq:0)] → Broker (중복 감지, 무시)
↓
ACK (성공)
Broker가 이미 처리한 메시지를 감지하고 중복을 방지합니다.
Idempotent Producer 활성화
설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 멱등성 활성화
props.put("enable.idempotence", true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);기본 설정 (Kafka 3.0+)
Kafka 3.0부터는 기본적으로 활성화되어 있습니다:
// Kafka 3.0+에서는 명시적 설정 불필요
// enable.idempotence=true가 기본값자동 적용되는 설정
enable.idempotence=true를 설정하면 다음 설정이 자동으로 적용됩니다:
acks = all // 모든 ISR 레플리카의 확인 필요
retries = Integer.MAX_VALUE // 무제한 재시도
max.in.flight.requests.per.connection <= 5 // 최대 5개 요청만약 이 설정들과 충돌하는 값을 지정하면 에러가 발생합니다:
// ✗ 에러 발생!
props.put("enable.idempotence", true);
props.put("acks", "1"); // idempotence는 acks=all 필요동작 원리
1. Producer ID (PID)
각 Producer 인스턴스는 고유한 Producer ID를 받습니다:
Producer 시작 → Broker에게 PID 요청 → PID 할당 (예: 12345)
- PID는 Producer 세션 동안 유효
- Producer를 재시작하면 새로운 PID 할당
- 내부적으로만 사용되며 사용자에게 노출되지 않음
2. Sequence Number
각 메시지는 파티션별로 순차적인 시퀀스 번호를 받습니다:
Partition 0: Msg(PID:1, Seq:0), Msg(PID:1, Seq:1), Msg(PID:1, Seq:2), ...
Partition 1: Msg(PID:1, Seq:0), Msg(PID:1, Seq:1), Msg(PID:1, Seq:2), ...
- 각 파티션마다 독립적인 시퀀스 번호
- 0부터 시작하여 순차 증가
- Integer 범위 초과 시 0으로 리셋
3. 중복 감지 메커니즘
Broker는 각 파티션에 대해 (PID, Sequence Number) 매핑을 유지합니다:
// Broker의 내부 상태
Map<(ProducerId, Partition), LastSequenceNumber>
예시:
(PID:1, Partition:0) → 5 // 마지막으로 저장된 시퀀스 번호메시지 처리 로직
새 메시지 도착: (PID:1, Partition:0, Seq:6)
↓
브로커 확인: 현재 상태 = (PID:1, Partition:0) → 5
↓
Seq == LastSeq + 1 ?
/ \
Yes No
↓ ↓
저장 및 중복/순서 오류
상태 업데이트 ↓
↓ Seq == LastSeq ?
Success / \
Yes No
↓ ↓
중복 무시 순서 오류
(ACK 전송) (에러 반환)
4. 예제: 중복 방지 동작
Timeline:
T1: Producer → Msg(PID:1, Seq:0) → Broker
T2: Broker 저장 성공, 상태 업데이트: (PID:1, P:0) → 0
T3: Broker → ACK → Producer (네트워크 손실!)
T4: Producer 타임아웃, 재시도
T5: Producer → Msg(PID:1, Seq:0) → Broker
T6: Broker 확인: Seq(0) == LastSeq(0) → 중복!
T7: Broker → ACK (저장하지 않고 ACK만 전송)
T8: Producer 성공 수신
순서 보장
Idempotent Producer는 파티션 내 순서도 보장합니다.
max.in.flight.requests.per.connection
기본 Producer는 순서를 보장하지 않습니다:
# max.in.flight.requests.per.connection = 5인 경우
Request 1: Msg A → [전송 중...]
Request 2: Msg B → [전송 완료] // B가 먼저 도착!
Request 3: Msg C → [전송 중...]
결과: B, A, C (순서 깨짐)
Idempotent Producer의 순서 보장
props.put("enable.idempotence", true);
// max.in.flight.requests.per.connection은 자동으로 5 이하로 제한됨Broker가 시퀀스 번호로 순서를 검증:
Broker 수신: Msg(Seq:1) → 대기
Msg(Seq:0) → 저장
Msg(Seq:1) → 저장 (이제 Seq:0 다음이므로)
결과: 순서 보장됨
만약 순서가 맞지 않으면:
Broker 수신: Msg(Seq:2)
현재 상태: LastSeq = 0
예상: Seq = 1
결과: OutOfOrderSequenceException
제한사항
1. Producer 세션 범위
멱등성은 단일 Producer 세션 내에서만 보장됩니다:
// Producer 1
Producer p1 = new KafkaProducer<>(props); // PID: 100
p1.send(record); // Seq: 0
p1.close();
// Producer 2 (새로운 세션)
Producer p2 = new KafkaProducer<>(props); // PID: 101 (다름!)
p2.send(record); // Seq: 0 (새로 시작)
// p1과 p2 사이의 중복은 감지되지 않음!2. 파티션별 보장
중복 방지는 파티션별로만 보장됩니다:
// 같은 메시지를 다른 파티션으로 전송
record1 = new ProducerRecord<>("topic", 0, "key", "value");
record2 = new ProducerRecord<>("topic", 1, "key", "value");
producer.send(record1);
producer.send(record2);
// 서로 다른 파티션이므로 중복으로 간주되지 않음3. Broker 재시작
Broker가 재시작되면 PID-Sequence 매핑이 일부 손실될 수 있습니다:
Broker 재시작 → 메모리 내 매핑 손실
→ 로그에서 복구 (최근 것만)
→ 일부 중복 감지 불가
4. 토픽 삭제 및 재생성
토픽을 삭제하고 재생성하면 매핑이 초기화됩니다.
성능 영향
처리량
- 최소: 멱등성으로 인한 오버헤드는 약 3-5%
acks=all로 인한 영향이 더 큼 (약 10-20%)
레이턴시
- PID 할당: 초기 한 번만 (무시할 수 있는 수준)
- 시퀀스 번호 검증: 매우 빠름 (메모리 조회)
메모리
Broker는 각 (PID, Partition) 조합의 마지막 시퀀스 번호를 메모리에 유지:
메모리 사용량 = 활성 PID 수 × 파티션 수 × 8 bytes (대략)
예시:
- 1000개 Producer
- 10개 파티션
- 약 80KB 메모리
사용 예제
기본 사용
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "value");
// 중복이 자동으로 방지됨
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 에러 처리
exception.printStackTrace();
} else {
System.out.printf("Sent to partition %d at offset %d%n",
metadata.partition(), metadata.offset());
}
});
producer.close();재시도와 함께 사용
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 멱등성 + 무한 재시도
props.put("enable.idempotence", true);
props.put("retries", Integer.MAX_VALUE);
props.put("delivery.timeout.ms", 120000); // 2분 이내
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 네트워크 오류가 발생해도 자동 재시도하며 중복 방지
producer.send(record);Transactional Producer와의 관계
Transactional Producer는 Idempotent Producer를 기반으로 합니다:
Transactional Producer
↓
내부적으로
↓
Idempotent Producer 사용
트랜잭션을 사용하면 자동으로 멱등성이 활성화됩니다:
props.put("transactional.id", "my-transactional-id");
// enable.idempotence=true 자동 적용차이점:
- Idempotent: 단일 세션, 중복 방지
- Transactional: 다중 세션, 중복 방지 + 원자성 보장
관련 문서: Transactional Producer
Best Practices
1. 기본적으로 활성화 (Kafka 3.0+)
// Kafka 3.0+에서는 자동 활성화되므로 확인만
// enable.idempotence=true (기본값)2. 이전 버전에서는 명시적 활성화
// Kafka 3.0 이전 버전
props.put("enable.idempotence", true);3. Producer 인스턴스 재사용
// ✓ 좋은 예: 하나의 Producer를 재사용
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<>("topic", "key", "value" + i));
}
producer.close();
// ✗ 나쁜 예: 매번 새로운 Producer 생성
for (int i = 0; i < 1000; i++) {
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("topic", "key", "value" + i));
producer.close(); // 새로운 PID 할당, 중복 감지 안 됨
}4. 충분한 재시도 설정
props.put("enable.idempotence", true);
props.put("retries", Integer.MAX_VALUE);
props.put("delivery.timeout.ms", 120000);5. 에러 처리
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof OutOfOrderSequenceException) {
// 순서 오류 - 심각한 문제, 로그 및 알림
logger.error("Sequence error", exception);
} else if (exception instanceof ProducerFencedException) {
// Producer가 펜싱됨 - 종료 필요
logger.error("Producer fenced", exception);
producer.close();
}
}
});문제 해결
OutOfOrderSequenceException
원인: 시퀀스 번호가 예상과 다름
해결:
- 로그 확인 및 분석
- Producer 재시작
- 네트워크 안정성 확인
// 에러 로그 예시
org.apache.kafka.common.errors.OutOfOrderSequenceException:
The broker received an out of order sequence numberProducerFencedException
원인: 같은 transactional.id를 사용하는 다른 Producer가 활성화됨
해결:
- Producer 종료
transactional.id확인- 중복 Producer 인스턴스 제거
멱등성 vs 트랜잭션 선택
| 요구사항 | Idempotent | Transactional |
|---|---|---|
| 중복 방지 | ✓ | ✓ |
| 순서 보장 (파티션 내) | ✓ | ✓ |
| 다중 파티션 원자성 | ✗ | ✓ |
| Producer 재시작 후 보장 | ✗ | ✓ |
| Read Committed | ✗ | ✓ |
| 성능 오버헤드 | 낮음 | 중간 |
선택 가이드:
- Idempotent: 대부분의 경우 (기본값)
- Transactional: Exactly-once semantics 필요 시
댓글 (0)