Exactly-once Semantics 구현

Kafka에서 Exactly-once Semantics(EOS)를 구현하는 방법과 내부 동작 원리를 살펴봅니다.

Exactly-once의 범위

EOS가 적용되는 범위

┌─────────────────────────────────────────────────────────────────┐
│                    Kafka Exactly-once 범위                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────┐     ┌──────────────────┐     ┌──────────┐        │
│  │ Producer │────>│   Kafka Cluster  │────>│ Consumer │        │
│  └──────────┘     └──────────────────┘     └──────────┘        │
│       │                    │                    │               │
│       └────────────────────┴────────────────────┘               │
│              Kafka 내부에서 EOS 보장                              │
│                                                                 │
├─────────────────────────────────────────────────────────────────┤
│  외부 시스템과의 EOS는 별도 구현 필요                              │
│  - 데이터베이스: 트랜잭션 + 오프셋 저장                            │
│  - 파일 시스템: 원자적 쓰기 + 체크포인트                           │
└─────────────────────────────────────────────────────────────────┘

EOS 구현 단계

단계기능도입 버전
Idempotent ProducerProducer→Broker 중복 방지Kafka 0.11
Transactional API다중 파티션 원자적 쓰기Kafka 0.11
Exactly-once StreamsKafka Streams EOSKafka 0.11
EOS v2개선된 트랜잭션 프로토콜Kafka 2.5

Idempotent Producer 구현

동작 원리

┌─────────────┐                      ┌─────────────┐
│  Producer   │                      │   Broker    │
│             │                      │             │
│ PID: 1000   │                      │             │
│ Seq: 0,1,2  │                      │             │
└─────────────┘                      └─────────────┘
      │                                    │
      │─── (PID=1000, Seq=0, msg1) ───────>│
      │<── ACK ────────────────────────────│
      │                                    │
      │─── (PID=1000, Seq=1, msg2) ───────>│
      │    (네트워크 타임아웃, ACK 손실)     │
      │                                    │
      │─── (PID=1000, Seq=1, msg2) ───────>│  재전송
      │    Broker: "Seq=1 이미 있음, 무시"  │  → 중복 방지
      │<── ACK ────────────────────────────│

Producer ID (PID)와 Sequence Number

  • PID: Producer 인스턴스별 고유 ID (브로커가 할당)
  • Sequence Number: 메시지별 순차 번호 (파티션별로 관리)
  • 조합: (PID, Partition, Sequence) → 중복 감지

설정 방법

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
// Idempotent Producer 활성화
props.put("enable.idempotence", "true");
 
// 자동으로 적용되는 설정:
// - acks = all
// - retries = Integer.MAX_VALUE
// - max.in.flight.requests.per.connection <= 5
 
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

제한사항

Idempotent Producer의 한계:
├── 단일 Producer 세션 내에서만 보장
├── Producer 재시작 시 새 PID 할당 → 이전 중복 감지 불가
├── 단일 파티션에 대한 쓰기만 보장
└── 다중 파티션 원자성 미보장

Transactional Producer 구현

트랜잭션 흐름

┌──────────────────────────────────────────────────────────────────┐
│                     Transaction Flow                              │
├──────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Producer              Transaction           Broker              │
│     │                  Coordinator              │                │
│     │                      │                    │                │
│     │── initTransactions ─>│                    │                │
│     │<── PID assigned ─────│                    │                │
│     │                      │                    │                │
│     │── beginTransaction ─>│                    │                │
│     │                      │                    │                │
│     │────────────────────── send(topic1) ──────>│                │
│     │────────────────────── send(topic2) ──────>│                │
│     │                      │                    │                │
│     │── commitTransaction ─>                    │                │
│     │                      │── write markers ──>│                │
│     │<── committed ────────│                    │                │
│                                                                  │
└──────────────────────────────────────────────────────────────────┘

전체 구현 예제

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
// 트랜잭션 설정
props.put("transactional.id", "order-processing-tx-1");  // 필수: 고유 ID
props.put("enable.idempotence", "true");  // 자동 활성화됨
 
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
 
// 트랜잭션 초기화 (Producer 시작 시 1회)
producer.initTransactions();
 
try {
    // 트랜잭션 시작
    producer.beginTransaction();
 
    // 여러 토픽/파티션에 원자적 쓰기
    producer.send(new ProducerRecord<>("orders", "order-123", "created"));
    producer.send(new ProducerRecord<>("inventory", "item-456", "reserved"));
    producer.send(new ProducerRecord<>("notifications", "user-789", "order placed"));
 
    // 모든 메시지가 성공적으로 커밋됨
    producer.commitTransaction();
 
} catch (ProducerFencedException e) {
    // 다른 Producer가 같은 transactional.id로 시작됨
    producer.close();
    throw e;
 
} catch (KafkaException e) {
    // 트랜잭션 실패 - 롤백
    producer.abortTransaction();
    throw e;
}

transactional.id의 역할

// transactional.id는 Producer 인스턴스를 식별
props.put("transactional.id", "my-app-instance-1");

동작 방식:

  1. 같은 transactional.id로 새 Producer 시작 시 이전 Producer는 “fenced”
  2. 미완료 트랜잭션 자동 롤백
  3. Producer 재시작 후에도 트랜잭션 일관성 유지
Producer A (transactional.id = "tx-1")
     │
     │── beginTransaction
     │── send messages...
     │   (장애 발생)
     │
     ▼
Producer B (transactional.id = "tx-1") 시작
     │
     │── initTransactions
     │   └── Producer A fenced
     │   └── A의 미완료 트랜잭션 abort
     │
     └── 정상 트랜잭션 시작

Consumer의 Exactly-once 읽기

read_committed 격리 수준

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "exactly-once-consumer");
 
// 커밋된 메시지만 읽음
props.put("isolation.level", "read_committed");
 
// 수동 커밋 사용
props.put("enable.auto.commit", "false");
 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

isolation.level 비교

설정동작사용 시나리오
read_uncommitted모든 메시지 읽음 (기본값)트랜잭션 미사용
read_committed커밋된 메시지만 읽음EOS 구현 시

read_committed 동작 방식

Partition:
┌────┬────┬────┬────┬────┬────┬────┬────┐
│ M1 │ M2 │ M3 │ M4 │ M5 │ M6 │ M7 │ M8 │
└────┴────┴────┴────┴────┴────┴────┴────┘
  ↑         ↑              ↑         ↑
  │         │              │         │
Committed  In-progress   Committed  Pending
 TX-A       TX-B          TX-A      TX-B

read_uncommitted: M1 ~ M8 모두 읽음
read_committed:   M1, M3, M4, M5 읽음 (TX-A만)
                  TX-B 커밋 전까지 M2, M6~M8 대기

Consume-Transform-Produce 패턴

개념

입력 토픽의 메시지를 처리하고 출력 토픽에 쓰는 패턴입니다.

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ Input Topic │────>│  Processor  │────>│Output Topic │
└─────────────┘     └─────────────┘     └─────────────┘
                          │
                          │ Exactly-once:
                          │ - Consumer offset 커밋
                          │ - Output 메시지 쓰기
                          │ - 둘 다 같은 트랜잭션

구현 예제

// Producer 설정
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("transactional.id", "consume-transform-produce-1");
 
// Consumer 설정
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "ctp-group");
consumerProps.put("isolation.level", "read_committed");
consumerProps.put("enable.auto.commit", "false");
 
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
 
producer.initTransactions();
consumer.subscribe(Arrays.asList("input-topic"));
 
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
    if (!records.isEmpty()) {
        producer.beginTransaction();
 
        try {
            for (ConsumerRecord<String, String> record : records) {
                // 변환 처리
                String transformedValue = transform(record.value());
 
                // 출력 토픽에 쓰기
                producer.send(new ProducerRecord<>(
                    "output-topic",
                    record.key(),
                    transformedValue
                ));
            }
 
            // Consumer 오프셋을 트랜잭션에 포함
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords =
                    records.records(partition);
                long lastOffset = partitionRecords
                    .get(partitionRecords.size() - 1).offset();
                offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
            }
 
            // 오프셋 커밋을 트랜잭션에 포함
            producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
 
            // 커밋: 메시지 쓰기 + 오프셋 커밋이 원자적으로 처리
            producer.commitTransaction();
 
        } catch (Exception e) {
            producer.abortTransaction();
            throw e;
        }
    }
}

Kafka Streams의 EOS

설정 방법

Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-streams");
 
// EOS 활성화 (Kafka 2.5+)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
    StreamsConfig.EXACTLY_ONCE_V2);
 
// Kafka 0.11 ~ 2.4
// props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
//     StreamsConfig.EXACTLY_ONCE);
 
StreamsBuilder builder = new StreamsBuilder();
 
// 스트림 처리 토폴로지
builder.<String, String>stream("input-topic")
    .mapValues(value -> processValue(value))
    .to("output-topic");
 
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

EXACTLY_ONCE vs EXACTLY_ONCE_V2

특성EXACTLY_ONCE (v1)EXACTLY_ONCE_V2
도입 버전Kafka 0.11Kafka 2.5
트랜잭션 ID파티션별태스크별
오버헤드높음낮음
확장성제한적개선됨
권장 사용레거시 호환새 애플리케이션

Kafka Streams EOS 내부 동작

┌────────────────────────────────────────────────────────────────┐
│                    Kafka Streams EOS                            │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│  1. Consumer가 input-topic에서 메시지 읽음                       │
│                         ↓                                       │
│  2. Processor가 메시지 처리 (상태 저장소 업데이트 포함)            │
│                         ↓                                       │
│  3. Transaction 시작                                            │
│      - Output 메시지 쓰기                                        │
│      - 상태 저장소 변경 (changelog topic)                        │
│      - Consumer 오프셋 커밋                                      │
│                         ↓                                       │
│  4. Transaction 커밋 (all or nothing)                           │
│                                                                │
└────────────────────────────────────────────────────────────────┘

외부 시스템과의 EOS

데이터베이스 연동

public class ExactlyOnceToDatabase {
 
    private final KafkaConsumer<String, String> consumer;
    private final DataSource dataSource;
 
    public void process() {
        while (true) {
            ConsumerRecords<String, String> records =
                consumer.poll(Duration.ofMillis(100));
 
            for (ConsumerRecord<String, String> record : records) {
                Connection conn = dataSource.getConnection();
                try {
                    conn.setAutoCommit(false);
 
                    // 1. 비즈니스 데이터 저장
                    saveBusinessData(conn, record);
 
                    // 2. 오프셋 저장 (같은 트랜잭션)
                    saveOffset(conn, record.topic(),
                        record.partition(), record.offset());
 
                    conn.commit();
                } catch (Exception e) {
                    conn.rollback();
                    throw e;
                } finally {
                    conn.close();
                }
            }
            // Kafka에는 커밋하지 않음 - DB가 source of truth
        }
    }
 
    // Consumer 재시작 시 DB에서 오프셋 복원
    public void initializeOffsets() {
        for (TopicPartition partition : consumer.assignment()) {
            Long savedOffset = getOffsetFromDB(
                partition.topic(), partition.partition());
            if (savedOffset != null) {
                consumer.seek(partition, savedOffset + 1);
            }
        }
    }
}

Outbox 패턴

// 1. 비즈니스 로직과 이벤트를 하나의 DB 트랜잭션으로
@Transactional
public void processOrder(Order order) {
    // 비즈니스 데이터 저장
    orderRepository.save(order);
 
    // Outbox 테이블에 이벤트 저장 (같은 트랜잭션)
    outboxRepository.save(new OutboxEvent(
        "orders",
        order.getId(),
        toJson(order)
    ));
}
 
// 2. 별도 프로세스가 Outbox → Kafka 발행
@Scheduled(fixedRate = 1000)
public void publishOutboxEvents() {
    List<OutboxEvent> events = outboxRepository.findUnpublished();
 
    for (OutboxEvent event : events) {
        producer.send(new ProducerRecord<>(
            event.getTopic(),
            event.getKey(),
            event.getPayload()
        )).get();
 
        event.markAsPublished();
        outboxRepository.save(event);
    }
}

성능 고려사항

트랜잭션 오버헤드

Non-transactional:
┌────────────────────────────────────────┐
│ send → ack → send → ack → send → ack  │
└────────────────────────────────────────┘

Transactional:
┌──────────────────────────────────────────────────────┐
│ begin → send → send → send → prepare → commit → ack │
│                                 ↑                    │
│                         추가 오버헤드                 │
└──────────────────────────────────────────────────────┘

최적화 전략

// 1. 배치 크기 증가
props.put("batch.size", "65536");  // 64KB
props.put("linger.ms", "10");
 
// 2. 트랜잭션당 메시지 수 증가
int batchCount = 0;
producer.beginTransaction();
 
for (ConsumerRecord<String, String> record : records) {
    producer.send(...);
    batchCount++;
 
    // 1000개마다 커밋
    if (batchCount >= 1000) {
        producer.commitTransaction();
        producer.beginTransaction();
        batchCount = 0;
    }
}
 
producer.commitTransaction();

성능 비교

모드상대 처리량레이턴시
Fire-and-forget100%최저
At-least-once80-90%낮음
Exactly-once50-70%중간

장애 처리

ProducerFencedException

try {
    producer.beginTransaction();
    // ...
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    // 같은 transactional.id로 다른 Producer가 시작됨
    // 이 Producer는 더 이상 사용할 수 없음
    log.error("Producer fenced, shutting down", e);
    producer.close();
    System.exit(1);
}

트랜잭션 타임아웃

// 트랜잭션 타임아웃 설정
props.put("transaction.timeout.ms", "60000");  // 기본 60초
 
// 타임아웃 초과 시 트랜잭션 자동 abort
// → 다음 beginTransaction에서 실패

관련 문서