Transactional Messaging
Kafka의 트랜잭션 기능을 사용하여 여러 메시지를 원자적으로 처리하는 방법을 살펴봅니다.
트랜잭션 개요
Kafka 트랜잭션의 특징
┌─────────────────────────────────────────────────────────────────┐
│ Kafka Transaction 특성 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ✓ 원자성 (Atomicity) │
│ - 다중 파티션/토픽에 대한 쓰기가 모두 성공하거나 모두 실패 │
│ │
│ ✓ 격리성 (Isolation) │
│ - read_committed Consumer는 커밋된 메시지만 읽음 │
│ │
│ ✗ 지속성 (Durability) │
│ - 기존 Kafka 복제 메커니즘으로 보장 │
│ │
│ ✗ 일관성 (Consistency) │
│ - 애플리케이션 레벨에서 관리 │
│ │
└─────────────────────────────────────────────────────────────────┘
트랜잭션 지원 범위
| 지원 | 미지원 |
|---|---|
| 다중 파티션 원자적 쓰기 | 읽기 트랜잭션 |
| 다중 토픽 원자적 쓰기 | 외부 시스템 트랜잭션 |
| 오프셋 커밋 포함 | 메시지 삭제/수정 |
Transaction Coordinator
역할
Transaction Coordinator는 트랜잭션을 관리하는 브로커 컴포넌트입니다.
┌─────────────────────────────────────────────────────────────────┐
│ Transaction Coordinator 역할 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Producer ID(PID) 할당 │
│ 2. Transaction 상태 관리 │
│ 3. Transaction Log 유지 (__transaction_state 토픽) │
│ 4. Transaction 타임아웃 처리 │
│ 5. 커밋/중단 마커 쓰기 조정 │
│ │
└─────────────────────────────────────────────────────────────────┘
Coordinator 선택
// transactional.id의 해시로 파티션 결정
int partition = Math.abs(transactionalId.hashCode()) % 50;
// __transaction_state 토픽의 해당 파티션 리더가 Coordinator트랜잭션 상태 머신
상태 전이도
┌─────────────────────────┐
│ │
▼ │
┌───────┐ ┌──────────┐ ┌──────────┐ │
│ Empty │───>│ Ongoing │───>│PrepareCommit│ │
└───────┘ └──────────┘ └──────────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌──────────┐ │
│ │ │CompleteCommit│ │
│ │ └──────────┘ │
│ │ │ │
│ ▼ │ │
│ ┌──────────┐ │ │
│ │PrepareAbort│ │ │
│ └──────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────┐ │ │
│ │CompleteAbort│ │ │
│ └──────────┘ │ │
│ │ │ │
└─────────────┴───────────────┴──────────┘
(재시작)
상태 설명
| 상태 | 설명 |
|---|---|
| Empty | 초기 상태, 트랜잭션 없음 |
| Ongoing | 트랜잭션 진행 중 |
| PrepareCommit | 커밋 준비 중 |
| CompleteCommit | 커밋 완료 |
| PrepareAbort | 중단 준비 중 |
| CompleteAbort | 중단 완료 |
트랜잭션 API 상세
initTransactions()
producer.initTransactions();동작:
- Transaction Coordinator 찾기
- PID와 epoch 할당받기
- 이전 미완료 트랜잭션 정리 (abort)
Producer Transaction Coordinator
│ │
│── FindCoordinator ──────────>│
│<── Coordinator info ─────────│
│ │
│── InitProducerId ───────────>│
│ (transactional.id) │
│<── PID + epoch ──────────────│
│ │
│ [이전 트랜잭션 있으면] │
│── EndTxn(ABORT) ────────────>│
│<── success ──────────────────│
beginTransaction()
producer.beginTransaction();동작:
- 로컬 상태만 변경 (네트워크 호출 없음)
- 트랜잭션 시작 플래그 설정
send() (트랜잭션 내)
producer.send(new ProducerRecord<>("topic", "key", "value"));동작:
- 첫 send 시 파티션을 트랜잭션에 등록
- 메시지 전송 (트랜잭션 마커 포함)
Producer Coordinator Broker
│ │ │
│── AddPartitionsToTxn ────>│ │
│ (topic, partition) │ │
│<── success ───────────────│ │
│ │ │
│───────────────────── Produce ──────────────────────────>│
│ (with PID, epoch, seq) │
│<──────────────────── ack ───────────────────────────────│
sendOffsetsToTransaction()
Consumer 오프셋을 트랜잭션에 포함시킵니다.
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(
new TopicPartition("input-topic", 0),
new OffsetAndMetadata(100)
);
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());동작:
- 오프셋을 트랜잭션에 등록
- 커밋 시 __consumer_offsets에 기록
commitTransaction()
producer.commitTransaction();동작:
- EndTxn(COMMIT) 요청
- Coordinator가 PrepareCommit 상태로 변경
- 모든 파티션에 COMMIT 마커 쓰기
- CompleteCommit 상태로 변경
Producer Coordinator Brokers
│ │ │
│── EndTxn(COMMIT) ────────>│ │
│ │── PrepareCommit ──────────>│
│ │ (log) │
│ │ │
│ │── WriteTxnMarker(COMMIT) ──>│
│ │ (to all partitions) │
│ │<── success ─────────────────│
│ │ │
│ │── CompleteCommit ─────────>│
│<── success ───────────────│ (log) │
abortTransaction()
producer.abortTransaction();동작:
- commitTransaction()과 유사하지만 ABORT 마커 사용
- Consumer는 ABORT된 메시지 무시
트랜잭션 마커
마커 종류
파티션 내 메시지와 마커:
┌────┬────┬────┬────────┬────┬────┬─────────┬────┬────┐
│ M1 │ M2 │ M3 │COMMIT │ M4 │ M5 │ ABORT │ M6 │ M7 │
│TX-A│TX-A│TX-A│ TX-A │TX-B│TX-B│ TX-B │TX-C│TX-C│
└────┴────┴────┴────────┴────┴────┴─────────┴────┴────┘
↑ ↑
커밋 마커 중단 마커
read_committed Consumer:
- TX-A 메시지 (M1, M2, M3) 읽음
- TX-B 메시지 (M4, M5) 건너뜀
- TX-C 메시지 대기 (마커 없음)
Control Batch
트랜잭션 마커는 Control Batch로 저장됩니다:
// Control Batch 구조
{
"type": "COMMIT" | "ABORT",
"coordinatorEpoch": 1,
"partitionLeaderEpoch": 5
}트랜잭션 격리 수준
read_uncommitted (기본값)
props.put("isolation.level", "read_uncommitted");- 모든 메시지 즉시 읽음
- 트랜잭션 상태 무시
- 높은 처리량
read_committed
props.put("isolation.level", "read_committed");- 커밋된 메시지만 읽음
- ABORT된 메시지 건너뜀
- 미완료 트랜잭션 메시지 대기
Last Stable Offset (LSO)
Partition:
┌────┬────┬────┬────┬────┬────┬────┬────┬────┐
│ M1 │ M2 │ M3 │ M4 │ M5 │ M6 │ M7 │ M8 │ M9 │
└────┴────┴────┴────┴────┴────┴────┴────┴────┘
↑ ↑ ↑
│ │ │
LSO Ongoing TX Log End
(offset 0) starts here Offset
read_committed: offset 0 ~ 3만 읽기 가능 (LSO까지)
read_uncommitted: offset 0 ~ 8 모두 읽기 가능
Producer Fencing
개념
같은 transactional.id로 여러 Producer가 실행되는 것을 방지합니다.
Producer A (epoch=0) Coordinator Producer B
│ │ │
│── initTransactions ─────────>│ │
│<── PID=1000, epoch=0 ────────│ │
│ │ │
│── beginTransaction ─────────>│ │
│ │ │
│ │<── initTransactions ────│
│ │ (same tx.id) │
│ │ │
│ │── PID=1000, epoch=1 ───>│
│ │ (epoch 증가) │
│ │ │
│── send ─────────────────────>│ │
│<── ProducerFencedException ──│ (epoch 불일치) │
│ (fenced!) │ │
Fencing 상황
- Producer 재시작: 새 epoch 할당
- 네트워크 파티션 복구: 두 Producer 동시 실행 시도
- 애플리케이션 중복 실행: 같은 tx.id 사용
처리 방법
try {
producer.send(record);
} catch (ProducerFencedException e) {
// 이 Producer는 더 이상 사용 불가
log.error("Producer has been fenced", e);
producer.close();
// 애플리케이션 종료 또는 재시작
System.exit(1);
}트랜잭션 타임아웃
설정
// Producer 설정
props.put("transaction.timeout.ms", "60000"); // 기본 60초
// Broker 설정
// transaction.max.timeout.ms = 900000 (15분, 최대값)타임아웃 동작
Timeline:
│
├── beginTransaction()
│
├── send() ...
│
├── 60초 경과 (처리 지연)
│
├── Coordinator: 트랜잭션 자동 abort
│
└── 다음 API 호출 시 예외 발생
try {
producer.commitTransaction();
} catch (TimeoutException e) {
// 트랜잭션 타임아웃
// 이미 abort 되었을 수 있음
producer.abortTransaction(); // 명시적 abort 시도
}실전 패턴
패턴 1: 단순 트랜잭션
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}패턴 2: 조건부 커밋
producer.initTransactions();
try {
producer.beginTransaction();
for (Order order : orders) {
producer.send(new ProducerRecord<>("orders", order.getId(), order));
}
// 조건 검증
if (validateOrders(orders)) {
producer.commitTransaction();
} else {
producer.abortTransaction();
log.warn("Validation failed, transaction aborted");
}
} catch (Exception e) {
producer.abortTransaction();
throw e;
}패턴 3: Consumer-Producer 파이프라인
producer.initTransactions();
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
try {
producer.beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// 변환 및 전송
String result = process(record.value());
producer.send(new ProducerRecord<>("output", record.key(), result));
// 오프셋 수집
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
// 오프셋을 트랜잭션에 포함
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
// Consumer는 다음 poll에서 같은 메시지 다시 받음
}
}패턴 4: Saga 패턴 (분산 트랜잭션)
// Kafka 트랜잭션 + 보상 트랜잭션
producer.initTransactions();
try {
producer.beginTransaction();
// 1. 주문 생성 이벤트
producer.send(new ProducerRecord<>("orders", orderId, orderCreated));
// 2. 재고 예약 이벤트
producer.send(new ProducerRecord<>("inventory", itemId, reserveStock));
// 3. 결제 요청 이벤트
producer.send(new ProducerRecord<>("payments", orderId, paymentRequest));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
// Kafka 메시지는 모두 취소됨
// 필요시 보상 트랜잭션 발행
sendCompensation(orderId);
}모니터링
JMX 메트릭
kafka.producer:type=producer-metrics,client-id={client-id}
├── txn-init-time-ns-total # initTransactions 시간
├── txn-begin-time-ns-total # beginTransaction 시간
├── txn-send-offsets-time-ns-total
├── txn-commit-time-ns-total
└── txn-abort-time-ns-total
트랜잭션 상태 확인
# 트랜잭션 상태 조회
kafka-transactions.sh --bootstrap-server localhost:9092 \
--describe --transactional-id my-tx-id미완료 트랜잭션 처리
# 미완료 트랜잭션 강제 abort
kafka-transactions.sh --bootstrap-server localhost:9092 \
--abort --transactional-id my-tx-idBest Practices
1. transactional.id 관리
// 인스턴스별 고유 ID
String transactionalId = "order-processor-" + instanceId;
props.put("transactional.id", transactionalId);2. 적절한 타임아웃
// 처리 시간에 맞는 타임아웃
props.put("transaction.timeout.ms", "120000"); // 2분3. 재시도 전략
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
producer.beginTransaction();
// ...
producer.commitTransaction();
break;
} catch (RetriableException e) {
producer.abortTransaction();
retryCount++;
Thread.sleep(1000 * retryCount);
}
}4. 에러 처리
try {
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 복구 불가 - 종료
producer.close();
throw e;
} catch (OutOfOrderSequenceException e) {
// 복구 불가 - 종료
producer.close();
throw e;
} catch (KafkaException e) {
// 중단 시도 후 재시도 가능
producer.abortTransaction();
}
댓글 (0)