Transactional Producer (트랜잭션 프로듀서)
Transactional Producer는 Kafka에서 Exactly-once semantics를 구현하기 위한 핵심 기능입니다. 여러 파티션에 대한 메시지 전송을 원자적으로 처리할 수 있습니다.
트랜잭션이란?
트랜잭션은 여러 작업을 하나의 논리적 단위로 묶어서 모두 성공하거나 모두 실패하도록 보장합니다.
트랜잭션이 필요한 경우
1. 다중 파티션 원자성
// 주문 생성과 재고 감소를 원자적으로 처리
beginTransaction();
send("orders", order); // 파티션 0
send("inventory", deduction); // 파티션 1
commitTransaction();
// 둘 다 성공하거나 둘 다 실패2. Read-Process-Write 패턴
// Consumer에서 읽고 처리한 후 Producer로 전송
beginTransaction();
records = consumer.poll();
for (record : records) {
result = process(record);
producer.send(result);
}
commitOffsets();
commitTransaction();
// 오프셋 커밋과 메시지 전송이 원자적으로 처리3. Exactly-once 보장
Consumer가 트랜잭션이 완전히 커밋된 메시지만 읽도록 보장합니다.
Transactional Producer 설정
1. Producer 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 트랜잭션 필수 설정
props.put("transactional.id", "my-transactional-id");
// 다음은 자동으로 설정됨
// enable.idempotence=true
// acks=all
// retries=Integer.MAX_VALUE
// max.in.flight.requests.per.connection=5
KafkaProducer<String, String> producer = new KafkaProducer<>(props);2. transactional.id
transactional.id는 트랜잭션 Producer의 핵심 식별자입니다:
props.put("transactional.id", "order-service-1");특징:
- 고유해야 함: 각 Producer 인스턴스는 고유한 ID 필요
- 안정적: 재시작 후에도 동일한 ID 사용
- 펜싱 메커니즘: 같은 ID의 이전 Producer 무효화
주의: 절대 같은 ID를 여러 Producer에서 동시에 사용하면 안 됨!
// ✓ 좋은 예: 인스턴스별 고유 ID
props.put("transactional.id", "order-service-" + instanceId);
// ✗ 나쁜 예: 모든 인스턴스가 같은 ID
props.put("transactional.id", "order-service"); // 충돌 발생!기본 사용법
1. Producer 초기화
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 트랜잭션 초기화 (한 번만 호출)
producer.initTransactions();2. 트랜잭션 실행
try {
// 트랜잭션 시작
producer.beginTransaction();
// 메시지 전송
producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
// 트랜잭션 커밋
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException |
AuthorizationException e) {
// 치명적 에러 - Producer 종료 필요
producer.close();
} catch (KafkaException e) {
// 복구 가능 에러 - 트랜잭션 중단
producer.abortTransaction();
}3. 완전한 예제
public class TransactionalProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// 초기화 (첫 실행 시 한 번만)
producer.initTransactions();
// 여러 트랜잭션 실행 가능
for (int i = 0; i < 100; i++) {
try {
producer.beginTransaction();
// 여러 토픽/파티션에 전송
producer.send(new ProducerRecord<>("orders",
"order-" + i, "order-data-" + i));
producer.send(new ProducerRecord<>("inventory",
"item-" + i, "inventory-update-" + i));
producer.commitTransaction();
System.out.println("Transaction " + i + " committed");
} catch (ProducerFencedException | OutOfOrderSequenceException |
AuthorizationException e) {
// 치명적 에러
throw e;
} catch (KafkaException e) {
// 복구 가능 에러
producer.abortTransaction();
System.err.println("Transaction " + i + " aborted: " + e.getMessage());
}
}
} finally {
producer.close();
}
}
}Read-Process-Write 패턴
Consumer와 Producer를 트랜잭션으로 연결하여 Exactly-once 처리를 구현합니다.
구현 예제
public class ExactlyOnceProcessor {
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";
public static void main(String[] args) {
// Consumer 설정
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "exactly-once-group");
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("enable.auto.commit", "false");
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList(INPUT_TOPIC));
// Producer 설정
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("transactional.id", "exactly-once-processor-1");
producerProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
try {
producer.beginTransaction();
// 메시지 처리 및 전송
for (ConsumerRecord<String, String> record : records) {
String processedValue = processMessage(record.value());
producer.send(new ProducerRecord<>(OUTPUT_TOPIC,
record.key(), processedValue));
}
// 오프셋과 메시지 전송을 원자적으로 커밋
producer.sendOffsetsToTransaction(
getOffsets(records),
consumer.groupMetadata());
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException |
AuthorizationException e) {
throw e;
} catch (KafkaException e) {
producer.abortTransaction();
}
}
}
} finally {
producer.close();
consumer.close();
}
}
private static String processMessage(String value) {
// 메시지 처리 로직
return value.toUpperCase();
}
private static Map<TopicPartition, OffsetAndMetadata> getOffsets(
ConsumerRecords<String, String> records) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long offset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(offset + 1));
}
return offsets;
}
}핵심 포인트
- Consumer 설정:
props.put("enable.auto.commit", "false"); // 수동 커밋
props.put("isolation.level", "read_committed"); // 커밋된 메시지만 읽기- 오프셋을 트랜잭션에 포함:
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());- 원자성 보장:
- 입력 토픽 오프셋 커밋 + 출력 토픽 메시지 전송
- 둘 다 성공하거나 둘 다 실패
동작 원리
1. Transaction Coordinator
각 Broker는 Transaction Coordinator로 동작할 수 있습니다:
Producer → Transaction Coordinator → __transaction_state 토픽
Transaction Coordinator의 역할:
- 트랜잭션 상태 관리
- Producer 펜싱
- 2-Phase Commit 조정
2. __transaction_state 토픽
트랜잭션 상태를 저장하는 내부 토픽:
Key: transactional.id
Value: {
transactionId,
producerId,
producerEpoch,
state: ONGOING | PREPARE_COMMIT | COMPLETE_COMMIT | PREPARE_ABORT | COMPLETE_ABORT,
partitions: [...]
}
3. 2-Phase Commit 프로토콜
Phase 1: Prepare
Producer → Coordinator: PREPARE_COMMIT
Coordinator → 모든 참여 파티션: Transaction Marker (BEGIN)
Phase 2: Commit
모든 파티션 응답 확인
Coordinator → 모든 참여 파티션: Transaction Marker (COMMIT)
Coordinator → __transaction_state: COMPLETE_COMMIT
4. Producer Epoch와 Fencing
transactional.id → (Producer ID, Epoch)
예시:
"my-tx-id" → (PID: 100, Epoch: 0) // 첫 번째 Producer
재시작 후:
"my-tx-id" → (PID: 100, Epoch: 1) // Epoch 증가
기존 Producer (Epoch: 0)는 펜싱됨 (무효화)
Consumer의 isolation.level
Consumer는 트랜잭션 메시지를 어떻게 읽을지 선택할 수 있습니다.
read_uncommitted (기본값)
props.put("isolation.level", "read_uncommitted");- 커밋되지 않은 메시지도 읽음
- At-least-once semantics
- 최고 성능
read_committed
props.put("isolation.level", "read_committed");- 커밋된 메시지만 읽음
- Exactly-once semantics
- 약간의 레이턴시 증가
Timeline:
T1: Producer → Msg A (트랜잭션 중)
T2: Producer → Msg B (트랜잭션 중)
T3: Producer.commit()
T4: Consumer (read_committed) → Msg A, B (이제 읽힘)
vs.
Timeline:
T1: Producer → Msg A (트랜잭션 중)
T1.5: Consumer (read_uncommitted) → Msg A (바로 읽힘)
T2: Producer → Msg B (트랜잭션 중)
T2.5: Consumer (read_uncommitted) → Msg B (바로 읽힘)
T3: Producer.abort()
T4: Consumer는 이미 Msg A, B를 읽음 (중단된 메시지!)
Transaction Markers
각 트랜잭션 메시지에는 마커가 추가됩니다:
Partition:
[Data Msg 1] (TX ID: 1)
[Data Msg 2] (TX ID: 1)
[COMMIT Marker] (TX ID: 1) ← 트랜잭션 커밋 표시
[Data Msg 3] (TX ID: 2)
[ABORT Marker] (TX ID: 2) ← 트랜잭션 중단 표시
Consumer는 마커를 읽어 트랜잭션 상태를 파악합니다.
성능 고려사항
오버헤드
트랜잭션은 추가 오버헤드가 있습니다:
- 네트워크: Transaction Coordinator와 통신
- 디스크: Transaction Markers 저장
- 레이턴시: 2-Phase Commit 프로토콜
최적화 전략
1. 트랜잭션 크기 조정
// ✗ 너무 작은 트랜잭션 (비효율적)
for (int i = 0; i < 10000; i++) {
producer.beginTransaction();
producer.send(record);
producer.commitTransaction(); // 10000번 커밋!
}
// ✓ 적절한 크기의 트랜잭션
producer.beginTransaction();
for (int i = 0; i < 10000; i++) {
producer.send(record);
}
producer.commitTransaction(); // 1번 커밋2. 배치 크기 조정
props.put("batch.size", 32768); // 32KB
props.put("linger.ms", 10);3. 타임아웃 설정
// 트랜잭션 타임아웃
props.put("transaction.timeout.ms", 60000); // 60초에러 처리
1. ProducerFencedException
원인: 같은 transactional.id의 새로운 Producer가 시작됨
처리:
try {
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 현재 Producer는 펜싱됨
// 재시도 불가 - Producer 종료
logger.error("Producer fenced, closing", e);
producer.close();
System.exit(1);
}2. OutOfOrderSequenceException
원인: 시퀀스 번호 불일치
처리:
try {
producer.commitTransaction();
} catch (OutOfOrderSequenceException e) {
// 심각한 에러 - Producer 종료
logger.error("Sequence error, closing", e);
producer.close();
System.exit(1);
}3. AuthorizationException
원인: 권한 부족
처리:
try {
producer.commitTransaction();
} catch (AuthorizationException e) {
// 권한 문제 - 설정 확인 필요
logger.error("Authorization failed", e);
producer.close();
System.exit(1);
}4. 복구 가능 에러
try {
producer.beginTransaction();
// ... 메시지 전송 ...
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException |
AuthorizationException e) {
// 치명적 에러 - 종료
producer.close();
throw e;
} catch (KafkaException e) {
// 복구 가능 - 중단 후 재시도
logger.warn("Transaction failed, aborting", e);
producer.abortTransaction();
// 재시도 로직
retryTransaction();
}Best Practices
1. 트랜잭션 크기 적절히 유지
// 권장: 100-1000개 메시지
producer.beginTransaction();
for (int i = 0; i < 500; i++) {
producer.send(records.get(i));
}
producer.commitTransaction();2. transactional.id 고유성 보장
// 애플리케이션 인스턴스별 고유 ID
String instanceId = System.getenv("INSTANCE_ID");
String transactionalId = "my-app-" + instanceId;
props.put("transactional.id", transactionalId);3. 적절한 타임아웃 설정
// 트랜잭션이 너무 길지 않도록
props.put("transaction.timeout.ms", 60000); // 60초
// 처리 시간을 고려하여 조정
props.put("max.poll.interval.ms", 300000); // 5분4. 멱등성 활용
// 트랜잭션은 자동으로 멱등성 활성화
// enable.idempotence=true 자동 적용5. 에러 처리 전략
public void processWithRetry(ConsumerRecords<String, String> records) {
int maxRetries = 3;
int retries = 0;
while (retries < maxRetries) {
try {
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
processAndSend(record);
}
producer.sendOffsetsToTransaction(
getOffsets(records),
consumer.groupMetadata());
producer.commitTransaction();
return; // 성공
} catch (ProducerFencedException | OutOfOrderSequenceException |
AuthorizationException e) {
// 치명적 - 즉시 종료
throw e;
} catch (KafkaException e) {
// 복구 가능 - 중단 및 재시도
producer.abortTransaction();
retries++;
logger.warn("Transaction failed, retry {}/{}", retries, maxRetries, e);
if (retries >= maxRetries) {
throw new RuntimeException("Max retries exceeded", e);
}
}
}
}Exactly-once 보장 요구사항
완전한 Exactly-once를 위해서는:
- Producer:
transactional.id설정 - Consumer:
isolation.level=read_committed설정 - 처리: Read-Process-Write 패턴 사용
- 오프셋:
sendOffsetsToTransaction()사용
// Producer
props.put("transactional.id", "my-tx-id");
// Consumer
props.put("isolation.level", "read_committed");
props.put("enable.auto.commit", "false");
// 처리
producer.beginTransaction();
// ... 처리 ...
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();제한사항
1. 단일 클러스터
트랜잭션은 단일 Kafka 클러스터 내에서만 동작합니다.
2. 성능 오버헤드
- 처리량: 약 10-20% 감소
- 레이턴시: 약 2-5배 증가
3. 복잡성 증가
- 코드 복잡도 증가
- 에러 처리 까다로움
- 디버깅 어려움
댓글 (0)