Transactional Messaging

Kafka의 트랜잭션 기능을 사용하여 여러 메시지를 원자적으로 처리하는 방법을 살펴봅니다.

트랜잭션 개요

Kafka 트랜잭션의 특징

┌─────────────────────────────────────────────────────────────────┐
│                   Kafka Transaction 특성                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ✓ 원자성 (Atomicity)                                           │
│    - 다중 파티션/토픽에 대한 쓰기가 모두 성공하거나 모두 실패        │
│                                                                 │
│  ✓ 격리성 (Isolation)                                           │
│    - read_committed Consumer는 커밋된 메시지만 읽음               │
│                                                                 │
│  ✗ 지속성 (Durability)                                          │
│    - 기존 Kafka 복제 메커니즘으로 보장                             │
│                                                                 │
│  ✗ 일관성 (Consistency)                                         │
│    - 애플리케이션 레벨에서 관리                                    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

트랜잭션 지원 범위

지원미지원
다중 파티션 원자적 쓰기읽기 트랜잭션
다중 토픽 원자적 쓰기외부 시스템 트랜잭션
오프셋 커밋 포함메시지 삭제/수정

Transaction Coordinator

역할

Transaction Coordinator는 트랜잭션을 관리하는 브로커 컴포넌트입니다.

┌─────────────────────────────────────────────────────────────────┐
│                  Transaction Coordinator 역할                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Producer ID(PID) 할당                                        │
│  2. Transaction 상태 관리                                        │
│  3. Transaction Log 유지 (__transaction_state 토픽)              │
│  4. Transaction 타임아웃 처리                                    │
│  5. 커밋/중단 마커 쓰기 조정                                      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Coordinator 선택

// transactional.id의 해시로 파티션 결정
int partition = Math.abs(transactionalId.hashCode()) % 50;
// __transaction_state 토픽의 해당 파티션 리더가 Coordinator

트랜잭션 상태 머신

상태 전이도

                    ┌─────────────────────────┐
                    │                         │
                    ▼                         │
┌───────┐    ┌──────────┐    ┌──────────┐    │
│ Empty │───>│ Ongoing  │───>│PrepareCommit│  │
└───────┘    └──────────┘    └──────────┘    │
    │             │               │          │
    │             │               ▼          │
    │             │         ┌──────────┐     │
    │             │         │CompleteCommit│  │
    │             │         └──────────┘     │
    │             │               │          │
    │             ▼               │          │
    │       ┌──────────┐          │          │
    │       │PrepareAbort│        │          │
    │       └──────────┘          │          │
    │             │               │          │
    │             ▼               │          │
    │       ┌──────────┐          │          │
    │       │CompleteAbort│       │          │
    │       └──────────┘          │          │
    │             │               │          │
    └─────────────┴───────────────┴──────────┘
                    (재시작)

상태 설명

상태설명
Empty초기 상태, 트랜잭션 없음
Ongoing트랜잭션 진행 중
PrepareCommit커밋 준비 중
CompleteCommit커밋 완료
PrepareAbort중단 준비 중
CompleteAbort중단 완료

트랜잭션 API 상세

initTransactions()

producer.initTransactions();

동작:

  1. Transaction Coordinator 찾기
  2. PID와 epoch 할당받기
  3. 이전 미완료 트랜잭션 정리 (abort)
Producer                    Transaction Coordinator
   │                              │
   │── FindCoordinator ──────────>│
   │<── Coordinator info ─────────│
   │                              │
   │── InitProducerId ───────────>│
   │   (transactional.id)         │
   │<── PID + epoch ──────────────│
   │                              │
   │   [이전 트랜잭션 있으면]       │
   │── EndTxn(ABORT) ────────────>│
   │<── success ──────────────────│

beginTransaction()

producer.beginTransaction();

동작:

  • 로컬 상태만 변경 (네트워크 호출 없음)
  • 트랜잭션 시작 플래그 설정

send() (트랜잭션 내)

producer.send(new ProducerRecord<>("topic", "key", "value"));

동작:

  1. 첫 send 시 파티션을 트랜잭션에 등록
  2. 메시지 전송 (트랜잭션 마커 포함)
Producer                    Coordinator                    Broker
   │                           │                             │
   │── AddPartitionsToTxn ────>│                             │
   │   (topic, partition)      │                             │
   │<── success ───────────────│                             │
   │                           │                             │
   │───────────────────── Produce ──────────────────────────>│
   │                     (with PID, epoch, seq)              │
   │<──────────────────── ack ───────────────────────────────│

sendOffsetsToTransaction()

Consumer 오프셋을 트랜잭션에 포함시킵니다.

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(
    new TopicPartition("input-topic", 0),
    new OffsetAndMetadata(100)
);
 
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());

동작:

  1. 오프셋을 트랜잭션에 등록
  2. 커밋 시 __consumer_offsets에 기록

commitTransaction()

producer.commitTransaction();

동작:

  1. EndTxn(COMMIT) 요청
  2. Coordinator가 PrepareCommit 상태로 변경
  3. 모든 파티션에 COMMIT 마커 쓰기
  4. CompleteCommit 상태로 변경
Producer                    Coordinator                    Brokers
   │                           │                             │
   │── EndTxn(COMMIT) ────────>│                             │
   │                           │── PrepareCommit ──────────>│
   │                           │                   (log)     │
   │                           │                             │
   │                           │── WriteTxnMarker(COMMIT) ──>│
   │                           │   (to all partitions)       │
   │                           │<── success ─────────────────│
   │                           │                             │
   │                           │── CompleteCommit ─────────>│
   │<── success ───────────────│                   (log)     │

abortTransaction()

producer.abortTransaction();

동작:

  • commitTransaction()과 유사하지만 ABORT 마커 사용
  • Consumer는 ABORT된 메시지 무시

트랜잭션 마커

마커 종류

파티션 내 메시지와 마커:

┌────┬────┬────┬────────┬────┬────┬─────────┬────┬────┐
│ M1 │ M2 │ M3 │COMMIT  │ M4 │ M5 │ ABORT   │ M6 │ M7 │
│TX-A│TX-A│TX-A│ TX-A   │TX-B│TX-B│  TX-B   │TX-C│TX-C│
└────┴────┴────┴────────┴────┴────┴─────────┴────┴────┘
                   ↑                   ↑
              커밋 마커            중단 마커

read_committed Consumer:
- TX-A 메시지 (M1, M2, M3) 읽음
- TX-B 메시지 (M4, M5) 건너뜀
- TX-C 메시지 대기 (마커 없음)

Control Batch

트랜잭션 마커는 Control Batch로 저장됩니다:

// Control Batch 구조
{
    "type": "COMMIT" | "ABORT",
    "coordinatorEpoch": 1,
    "partitionLeaderEpoch": 5
}

트랜잭션 격리 수준

read_uncommitted (기본값)

props.put("isolation.level", "read_uncommitted");
  • 모든 메시지 즉시 읽음
  • 트랜잭션 상태 무시
  • 높은 처리량

read_committed

props.put("isolation.level", "read_committed");
  • 커밋된 메시지만 읽음
  • ABORT된 메시지 건너뜀
  • 미완료 트랜잭션 메시지 대기

Last Stable Offset (LSO)

Partition:
┌────┬────┬────┬────┬────┬────┬────┬────┬────┐
│ M1 │ M2 │ M3 │ M4 │ M5 │ M6 │ M7 │ M8 │ M9 │
└────┴────┴────┴────┴────┴────┴────┴────┴────┘
  ↑                   ↑                   ↑
  │                   │                   │
 LSO            Ongoing TX           Log End
(offset 0)      starts here         Offset

read_committed: offset 0 ~ 3만 읽기 가능 (LSO까지)
read_uncommitted: offset 0 ~ 8 모두 읽기 가능

Producer Fencing

개념

같은 transactional.id로 여러 Producer가 실행되는 것을 방지합니다.

Producer A (epoch=0)              Coordinator              Producer B
       │                              │                         │
       │── initTransactions ─────────>│                         │
       │<── PID=1000, epoch=0 ────────│                         │
       │                              │                         │
       │── beginTransaction ─────────>│                         │
       │                              │                         │
       │                              │<── initTransactions ────│
       │                              │    (same tx.id)         │
       │                              │                         │
       │                              │── PID=1000, epoch=1 ───>│
       │                              │   (epoch 증가)           │
       │                              │                         │
       │── send ─────────────────────>│                         │
       │<── ProducerFencedException ──│  (epoch 불일치)         │
       │   (fenced!)                  │                         │

Fencing 상황

  1. Producer 재시작: 새 epoch 할당
  2. 네트워크 파티션 복구: 두 Producer 동시 실행 시도
  3. 애플리케이션 중복 실행: 같은 tx.id 사용

처리 방법

try {
    producer.send(record);
} catch (ProducerFencedException e) {
    // 이 Producer는 더 이상 사용 불가
    log.error("Producer has been fenced", e);
    producer.close();
 
    // 애플리케이션 종료 또는 재시작
    System.exit(1);
}

트랜잭션 타임아웃

설정

// Producer 설정
props.put("transaction.timeout.ms", "60000");  // 기본 60초
 
// Broker 설정
// transaction.max.timeout.ms = 900000 (15분, 최대값)

타임아웃 동작

Timeline:
│
├── beginTransaction()
│
├── send() ...
│
├── 60초 경과 (처리 지연)
│
├── Coordinator: 트랜잭션 자동 abort
│
└── 다음 API 호출 시 예외 발생
try {
    producer.commitTransaction();
} catch (TimeoutException e) {
    // 트랜잭션 타임아웃
    // 이미 abort 되었을 수 있음
    producer.abortTransaction();  // 명시적 abort 시도
}

실전 패턴

패턴 1: 단순 트랜잭션

producer.initTransactions();
 
try {
    producer.beginTransaction();
 
    producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
    producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
 
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

패턴 2: 조건부 커밋

producer.initTransactions();
 
try {
    producer.beginTransaction();
 
    for (Order order : orders) {
        producer.send(new ProducerRecord<>("orders", order.getId(), order));
    }
 
    // 조건 검증
    if (validateOrders(orders)) {
        producer.commitTransaction();
    } else {
        producer.abortTransaction();
        log.warn("Validation failed, transaction aborted");
    }
} catch (Exception e) {
    producer.abortTransaction();
    throw e;
}

패턴 3: Consumer-Producer 파이프라인

producer.initTransactions();
 
while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
    if (records.isEmpty()) continue;
 
    try {
        producer.beginTransaction();
 
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 
        for (ConsumerRecord<String, String> record : records) {
            // 변환 및 전송
            String result = process(record.value());
            producer.send(new ProducerRecord<>("output", record.key(), result));
 
            // 오프셋 수집
            offsets.put(
                new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1)
            );
        }
 
        // 오프셋을 트랜잭션에 포함
        producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
 
        producer.commitTransaction();
    } catch (Exception e) {
        producer.abortTransaction();
        // Consumer는 다음 poll에서 같은 메시지 다시 받음
    }
}

패턴 4: Saga 패턴 (분산 트랜잭션)

// Kafka 트랜잭션 + 보상 트랜잭션
producer.initTransactions();
 
try {
    producer.beginTransaction();
 
    // 1. 주문 생성 이벤트
    producer.send(new ProducerRecord<>("orders", orderId, orderCreated));
 
    // 2. 재고 예약 이벤트
    producer.send(new ProducerRecord<>("inventory", itemId, reserveStock));
 
    // 3. 결제 요청 이벤트
    producer.send(new ProducerRecord<>("payments", orderId, paymentRequest));
 
    producer.commitTransaction();
 
} catch (Exception e) {
    producer.abortTransaction();
    // Kafka 메시지는 모두 취소됨
 
    // 필요시 보상 트랜잭션 발행
    sendCompensation(orderId);
}

모니터링

JMX 메트릭

kafka.producer:type=producer-metrics,client-id={client-id}
├── txn-init-time-ns-total    # initTransactions 시간
├── txn-begin-time-ns-total   # beginTransaction 시간
├── txn-send-offsets-time-ns-total
├── txn-commit-time-ns-total
└── txn-abort-time-ns-total

트랜잭션 상태 확인

# 트랜잭션 상태 조회
kafka-transactions.sh --bootstrap-server localhost:9092 \
    --describe --transactional-id my-tx-id

미완료 트랜잭션 처리

# 미완료 트랜잭션 강제 abort
kafka-transactions.sh --bootstrap-server localhost:9092 \
    --abort --transactional-id my-tx-id

Best Practices

1. transactional.id 관리

// 인스턴스별 고유 ID
String transactionalId = "order-processor-" + instanceId;
props.put("transactional.id", transactionalId);

2. 적절한 타임아웃

// 처리 시간에 맞는 타임아웃
props.put("transaction.timeout.ms", "120000");  // 2분

3. 재시도 전략

int maxRetries = 3;
int retryCount = 0;
 
while (retryCount < maxRetries) {
    try {
        producer.beginTransaction();
        // ...
        producer.commitTransaction();
        break;
    } catch (RetriableException e) {
        producer.abortTransaction();
        retryCount++;
        Thread.sleep(1000 * retryCount);
    }
}

4. 에러 처리

try {
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    // 복구 불가 - 종료
    producer.close();
    throw e;
} catch (OutOfOrderSequenceException e) {
    // 복구 불가 - 종료
    producer.close();
    throw e;
} catch (KafkaException e) {
    // 중단 시도 후 재시도 가능
    producer.abortTransaction();
}

관련 문서