Transactional Producer (트랜잭션 프로듀서)

Transactional Producer는 Kafka에서 Exactly-once semantics를 구현하기 위한 핵심 기능입니다. 여러 파티션에 대한 메시지 전송을 원자적으로 처리할 수 있습니다.

트랜잭션이란?

트랜잭션은 여러 작업을 하나의 논리적 단위로 묶어서 모두 성공하거나 모두 실패하도록 보장합니다.

트랜잭션이 필요한 경우

1. 다중 파티션 원자성

// 주문 생성과 재고 감소를 원자적으로 처리
beginTransaction();
send("orders", order);          // 파티션 0
send("inventory", deduction);   // 파티션 1
commitTransaction();
// 둘 다 성공하거나 둘 다 실패

2. Read-Process-Write 패턴

// Consumer에서 읽고 처리한 후 Producer로 전송
beginTransaction();
records = consumer.poll();
for (record : records) {
    result = process(record);
    producer.send(result);
}
commitOffsets();
commitTransaction();
// 오프셋 커밋과 메시지 전송이 원자적으로 처리

3. Exactly-once 보장

Consumer가 트랜잭션이 완전히 커밋된 메시지만 읽도록 보장합니다.

Transactional Producer 설정

1. Producer 설정

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
// 트랜잭션 필수 설정
props.put("transactional.id", "my-transactional-id");
 
// 다음은 자동으로 설정됨
// enable.idempotence=true
// acks=all
// retries=Integer.MAX_VALUE
// max.in.flight.requests.per.connection=5
 
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

2. transactional.id

transactional.id는 트랜잭션 Producer의 핵심 식별자입니다:

props.put("transactional.id", "order-service-1");

특징:

  • 고유해야 함: 각 Producer 인스턴스는 고유한 ID 필요
  • 안정적: 재시작 후에도 동일한 ID 사용
  • 펜싱 메커니즘: 같은 ID의 이전 Producer 무효화

주의: 절대 같은 ID를 여러 Producer에서 동시에 사용하면 안 됨!

// ✓ 좋은 예: 인스턴스별 고유 ID
props.put("transactional.id", "order-service-" + instanceId);
 
// ✗ 나쁜 예: 모든 인스턴스가 같은 ID
props.put("transactional.id", "order-service");  // 충돌 발생!

기본 사용법

1. Producer 초기화

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
 
// 트랜잭션 초기화 (한 번만 호출)
producer.initTransactions();

2. 트랜잭션 실행

try {
    // 트랜잭션 시작
    producer.beginTransaction();
 
    // 메시지 전송
    producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
    producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
 
    // 트랜잭션 커밋
    producer.commitTransaction();
 
} catch (ProducerFencedException | OutOfOrderSequenceException |
         AuthorizationException e) {
    // 치명적 에러 - Producer 종료 필요
    producer.close();
 
} catch (KafkaException e) {
    // 복구 가능 에러 - 트랜잭션 중단
    producer.abortTransaction();
}

3. 완전한 예제

public class TransactionalProducerExample {
 
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("transactional.id", "my-transactional-id");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
 
        try {
            // 초기화 (첫 실행 시 한 번만)
            producer.initTransactions();
 
            // 여러 트랜잭션 실행 가능
            for (int i = 0; i < 100; i++) {
                try {
                    producer.beginTransaction();
 
                    // 여러 토픽/파티션에 전송
                    producer.send(new ProducerRecord<>("orders",
                        "order-" + i, "order-data-" + i));
                    producer.send(new ProducerRecord<>("inventory",
                        "item-" + i, "inventory-update-" + i));
 
                    producer.commitTransaction();
                    System.out.println("Transaction " + i + " committed");
 
                } catch (ProducerFencedException | OutOfOrderSequenceException |
                         AuthorizationException e) {
                    // 치명적 에러
                    throw e;
 
                } catch (KafkaException e) {
                    // 복구 가능 에러
                    producer.abortTransaction();
                    System.err.println("Transaction " + i + " aborted: " + e.getMessage());
                }
            }
 
        } finally {
            producer.close();
        }
    }
}

Read-Process-Write 패턴

Consumer와 Producer를 트랜잭션으로 연결하여 Exactly-once 처리를 구현합니다.

구현 예제

public class ExactlyOnceProcessor {
 
    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";
 
    public static void main(String[] args) {
        // Consumer 설정
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "localhost:9092");
        consumerProps.put("group.id", "exactly-once-group");
        consumerProps.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("enable.auto.commit", "false");
        consumerProps.put("isolation.level", "read_committed");
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Collections.singletonList(INPUT_TOPIC));
 
        // Producer 설정
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("transactional.id", "exactly-once-processor-1");
        producerProps.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
 
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
        producer.initTransactions();
 
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
                if (!records.isEmpty()) {
                    try {
                        producer.beginTransaction();
 
                        // 메시지 처리 및 전송
                        for (ConsumerRecord<String, String> record : records) {
                            String processedValue = processMessage(record.value());
 
                            producer.send(new ProducerRecord<>(OUTPUT_TOPIC,
                                record.key(), processedValue));
                        }
 
                        // 오프셋과 메시지 전송을 원자적으로 커밋
                        producer.sendOffsetsToTransaction(
                            getOffsets(records),
                            consumer.groupMetadata());
 
                        producer.commitTransaction();
 
                    } catch (ProducerFencedException | OutOfOrderSequenceException |
                             AuthorizationException e) {
                        throw e;
 
                    } catch (KafkaException e) {
                        producer.abortTransaction();
                    }
                }
            }
 
        } finally {
            producer.close();
            consumer.close();
        }
    }
 
    private static String processMessage(String value) {
        // 메시지 처리 로직
        return value.toUpperCase();
    }
 
    private static Map<TopicPartition, OffsetAndMetadata> getOffsets(
            ConsumerRecords<String, String> records) {
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            long offset = partitionRecords.get(partitionRecords.size() - 1).offset();
            offsets.put(partition, new OffsetAndMetadata(offset + 1));
        }
 
        return offsets;
    }
}

핵심 포인트

  1. Consumer 설정:
props.put("enable.auto.commit", "false");  // 수동 커밋
props.put("isolation.level", "read_committed");  // 커밋된 메시지만 읽기
  1. 오프셋을 트랜잭션에 포함:
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
  1. 원자성 보장:
  • 입력 토픽 오프셋 커밋 + 출력 토픽 메시지 전송
  • 둘 다 성공하거나 둘 다 실패

동작 원리

1. Transaction Coordinator

각 Broker는 Transaction Coordinator로 동작할 수 있습니다:

Producer → Transaction Coordinator → __transaction_state 토픽

Transaction Coordinator의 역할:

  • 트랜잭션 상태 관리
  • Producer 펜싱
  • 2-Phase Commit 조정

2. __transaction_state 토픽

트랜잭션 상태를 저장하는 내부 토픽:

Key: transactional.id
Value: {
  transactionId,
  producerId,
  producerEpoch,
  state: ONGOING | PREPARE_COMMIT | COMPLETE_COMMIT | PREPARE_ABORT | COMPLETE_ABORT,
  partitions: [...]
}

3. 2-Phase Commit 프로토콜

Phase 1: Prepare
  Producer → Coordinator: PREPARE_COMMIT
  Coordinator → 모든 참여 파티션: Transaction Marker (BEGIN)

Phase 2: Commit
  모든 파티션 응답 확인
  Coordinator → 모든 참여 파티션: Transaction Marker (COMMIT)
  Coordinator → __transaction_state: COMPLETE_COMMIT

4. Producer Epoch와 Fencing

transactional.id → (Producer ID, Epoch)

예시:
"my-tx-id" → (PID: 100, Epoch: 0)  // 첫 번째 Producer

재시작 후:
"my-tx-id" → (PID: 100, Epoch: 1)  // Epoch 증가

기존 Producer (Epoch: 0)는 펜싱됨 (무효화)

Consumer의 isolation.level

Consumer는 트랜잭션 메시지를 어떻게 읽을지 선택할 수 있습니다.

read_uncommitted (기본값)

props.put("isolation.level", "read_uncommitted");
  • 커밋되지 않은 메시지도 읽음
  • At-least-once semantics
  • 최고 성능

read_committed

props.put("isolation.level", "read_committed");
  • 커밋된 메시지만 읽음
  • Exactly-once semantics
  • 약간의 레이턴시 증가
Timeline:
T1: Producer → Msg A (트랜잭션 중)
T2: Producer → Msg B (트랜잭션 중)
T3: Producer.commit()
T4: Consumer (read_committed) → Msg A, B (이제 읽힘)

vs.

Timeline:
T1: Producer → Msg A (트랜잭션 중)
T1.5: Consumer (read_uncommitted) → Msg A (바로 읽힘)
T2: Producer → Msg B (트랜잭션 중)
T2.5: Consumer (read_uncommitted) → Msg B (바로 읽힘)
T3: Producer.abort()
T4: Consumer는 이미 Msg A, B를 읽음 (중단된 메시지!)

Transaction Markers

각 트랜잭션 메시지에는 마커가 추가됩니다:

Partition:
[Data Msg 1] (TX ID: 1)
[Data Msg 2] (TX ID: 1)
[COMMIT Marker] (TX ID: 1)  ← 트랜잭션 커밋 표시
[Data Msg 3] (TX ID: 2)
[ABORT Marker] (TX ID: 2)   ← 트랜잭션 중단 표시

Consumer는 마커를 읽어 트랜잭션 상태를 파악합니다.

성능 고려사항

오버헤드

트랜잭션은 추가 오버헤드가 있습니다:

  1. 네트워크: Transaction Coordinator와 통신
  2. 디스크: Transaction Markers 저장
  3. 레이턴시: 2-Phase Commit 프로토콜

최적화 전략

1. 트랜잭션 크기 조정

// ✗ 너무 작은 트랜잭션 (비효율적)
for (int i = 0; i < 10000; i++) {
    producer.beginTransaction();
    producer.send(record);
    producer.commitTransaction();  // 10000번 커밋!
}
 
// ✓ 적절한 크기의 트랜잭션
producer.beginTransaction();
for (int i = 0; i < 10000; i++) {
    producer.send(record);
}
producer.commitTransaction();  // 1번 커밋

2. 배치 크기 조정

props.put("batch.size", 32768);  // 32KB
props.put("linger.ms", 10);

3. 타임아웃 설정

// 트랜잭션 타임아웃
props.put("transaction.timeout.ms", 60000);  // 60초

에러 처리

1. ProducerFencedException

원인: 같은 transactional.id의 새로운 Producer가 시작됨

처리:

try {
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    // 현재 Producer는 펜싱됨
    // 재시도 불가 - Producer 종료
    logger.error("Producer fenced, closing", e);
    producer.close();
    System.exit(1);
}

2. OutOfOrderSequenceException

원인: 시퀀스 번호 불일치

처리:

try {
    producer.commitTransaction();
} catch (OutOfOrderSequenceException e) {
    // 심각한 에러 - Producer 종료
    logger.error("Sequence error, closing", e);
    producer.close();
    System.exit(1);
}

3. AuthorizationException

원인: 권한 부족

처리:

try {
    producer.commitTransaction();
} catch (AuthorizationException e) {
    // 권한 문제 - 설정 확인 필요
    logger.error("Authorization failed", e);
    producer.close();
    System.exit(1);
}

4. 복구 가능 에러

try {
    producer.beginTransaction();
    // ... 메시지 전송 ...
    producer.commitTransaction();
 
} catch (ProducerFencedException | OutOfOrderSequenceException |
         AuthorizationException e) {
    // 치명적 에러 - 종료
    producer.close();
    throw e;
 
} catch (KafkaException e) {
    // 복구 가능 - 중단 후 재시도
    logger.warn("Transaction failed, aborting", e);
    producer.abortTransaction();
 
    // 재시도 로직
    retryTransaction();
}

Best Practices

1. 트랜잭션 크기 적절히 유지

// 권장: 100-1000개 메시지
producer.beginTransaction();
for (int i = 0; i < 500; i++) {
    producer.send(records.get(i));
}
producer.commitTransaction();

2. transactional.id 고유성 보장

// 애플리케이션 인스턴스별 고유 ID
String instanceId = System.getenv("INSTANCE_ID");
String transactionalId = "my-app-" + instanceId;
props.put("transactional.id", transactionalId);

3. 적절한 타임아웃 설정

// 트랜잭션이 너무 길지 않도록
props.put("transaction.timeout.ms", 60000);  // 60초
 
// 처리 시간을 고려하여 조정
props.put("max.poll.interval.ms", 300000);  // 5분

4. 멱등성 활용

// 트랜잭션은 자동으로 멱등성 활성화
// enable.idempotence=true 자동 적용

5. 에러 처리 전략

public void processWithRetry(ConsumerRecords<String, String> records) {
    int maxRetries = 3;
    int retries = 0;
 
    while (retries < maxRetries) {
        try {
            producer.beginTransaction();
 
            for (ConsumerRecord<String, String> record : records) {
                processAndSend(record);
            }
 
            producer.sendOffsetsToTransaction(
                getOffsets(records),
                consumer.groupMetadata());
 
            producer.commitTransaction();
            return;  // 성공
 
        } catch (ProducerFencedException | OutOfOrderSequenceException |
                 AuthorizationException e) {
            // 치명적 - 즉시 종료
            throw e;
 
        } catch (KafkaException e) {
            // 복구 가능 - 중단 및 재시도
            producer.abortTransaction();
            retries++;
            logger.warn("Transaction failed, retry {}/{}", retries, maxRetries, e);
 
            if (retries >= maxRetries) {
                throw new RuntimeException("Max retries exceeded", e);
            }
        }
    }
}

Exactly-once 보장 요구사항

완전한 Exactly-once를 위해서는:

  1. Producer: transactional.id 설정
  2. Consumer: isolation.level=read_committed 설정
  3. 처리: Read-Process-Write 패턴 사용
  4. 오프셋: sendOffsetsToTransaction() 사용
// Producer
props.put("transactional.id", "my-tx-id");
 
// Consumer
props.put("isolation.level", "read_committed");
props.put("enable.auto.commit", "false");
 
// 처리
producer.beginTransaction();
// ... 처리 ...
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();

제한사항

1. 단일 클러스터

트랜잭션은 단일 Kafka 클러스터 내에서만 동작합니다.

2. 성능 오버헤드

  • 처리량: 약 10-20% 감소
  • 레이턴시: 약 2-5배 증가

3. 복잡성 증가

  • 코드 복잡도 증가
  • 에러 처리 까다로움
  • 디버깅 어려움

관련 문서