Exactly-once Semantics 구현
Kafka에서 Exactly-once Semantics(EOS)를 구현하는 방법과 내부 동작 원리를 살펴봅니다.
Exactly-once의 범위
EOS가 적용되는 범위
┌─────────────────────────────────────────────────────────────────┐
│ Kafka Exactly-once 범위 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────────────┐ ┌──────────┐ │
│ │ Producer │────>│ Kafka Cluster │────>│ Consumer │ │
│ └──────────┘ └──────────────────┘ └──────────┘ │
│ │ │ │ │
│ └────────────────────┴────────────────────┘ │
│ Kafka 내부에서 EOS 보장 │
│ │
├─────────────────────────────────────────────────────────────────┤
│ 외부 시스템과의 EOS는 별도 구현 필요 │
│ - 데이터베이스: 트랜잭션 + 오프셋 저장 │
│ - 파일 시스템: 원자적 쓰기 + 체크포인트 │
└─────────────────────────────────────────────────────────────────┘
EOS 구현 단계
단계 기능 도입 버전 Idempotent Producer Producer→Broker 중복 방지 Kafka 0.11 Transactional API 다중 파티션 원자적 쓰기 Kafka 0.11 Exactly-once Streams Kafka Streams EOS Kafka 0.11 EOS v2 개선된 트랜잭션 프로토콜 Kafka 2.5
Idempotent Producer 구현
동작 원리
┌─────────────┐ ┌─────────────┐
│ Producer │ │ Broker │
│ │ │ │
│ PID: 1000 │ │ │
│ Seq: 0,1,2 │ │ │
└─────────────┘ └─────────────┘
│ │
│─── (PID=1000, Seq=0, msg1) ───────>│
│<── ACK ────────────────────────────│
│ │
│─── (PID=1000, Seq=1, msg2) ───────>│
│ (네트워크 타임아웃, ACK 손실) │
│ │
│─── (PID=1000, Seq=1, msg2) ───────>│ 재전송
│ Broker: "Seq=1 이미 있음, 무시" │ → 중복 방지
│<── ACK ────────────────────────────│
Producer ID (PID)와 Sequence Number
PID : Producer 인스턴스별 고유 ID (브로커가 할당)
Sequence Number : 메시지별 순차 번호 (파티션별로 관리)
조합 : (PID, Partition, Sequence) → 중복 감지
설정 방법
Properties props = new Properties ();
props. put ( "bootstrap.servers" , "localhost:9092" );
props. put ( "key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" );
props. put ( "value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" );
// Idempotent Producer 활성화
props. put ( "enable.idempotence" , "true" );
// 자동으로 적용되는 설정:
// - acks = all
// - retries = Integer.MAX_VALUE
// - max.in.flight.requests.per.connection <= 5
KafkaProducer< String , String > producer = new KafkaProducer<>(props);
제한사항
Idempotent Producer의 한계:
├── 단일 Producer 세션 내에서만 보장
├── Producer 재시작 시 새 PID 할당 → 이전 중복 감지 불가
├── 단일 파티션에 대한 쓰기만 보장
└── 다중 파티션 원자성 미보장
Transactional Producer 구현
트랜잭션 흐름
┌──────────────────────────────────────────────────────────────────┐
│ Transaction Flow │
├──────────────────────────────────────────────────────────────────┤
│ │
│ Producer Transaction Broker │
│ │ Coordinator │ │
│ │ │ │ │
│ │── initTransactions ─>│ │ │
│ │<── PID assigned ─────│ │ │
│ │ │ │ │
│ │── beginTransaction ─>│ │ │
│ │ │ │ │
│ │────────────────────── send(topic1) ──────>│ │
│ │────────────────────── send(topic2) ──────>│ │
│ │ │ │ │
│ │── commitTransaction ─> │ │
│ │ │── write markers ──>│ │
│ │<── committed ────────│ │ │
│ │
└──────────────────────────────────────────────────────────────────┘
전체 구현 예제
Properties props = new Properties ();
props. put ( "bootstrap.servers" , "localhost:9092" );
props. put ( "key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" );
props. put ( "value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" );
// 트랜잭션 설정
props. put ( "transactional.id" , "order-processing-tx-1" ); // 필수: 고유 ID
props. put ( "enable.idempotence" , "true" ); // 자동 활성화됨
KafkaProducer< String , String > producer = new KafkaProducer<>(props);
// 트랜잭션 초기화 (Producer 시작 시 1회)
producer. initTransactions ();
try {
// 트랜잭션 시작
producer. beginTransaction ();
// 여러 토픽/파티션에 원자적 쓰기
producer. send ( new ProducerRecord<>( "orders" , "order-123" , "created" ));
producer. send ( new ProducerRecord<>( "inventory" , "item-456" , "reserved" ));
producer. send ( new ProducerRecord<>( "notifications" , "user-789" , "order placed" ));
// 모든 메시지가 성공적으로 커밋됨
producer. commitTransaction ();
} catch (ProducerFencedException e ) {
// 다른 Producer가 같은 transactional.id로 시작됨
producer. close ();
throw e;
} catch (KafkaException e ) {
// 트랜잭션 실패 - 롤백
producer. abortTransaction ();
throw e;
}
transactional.id의 역할
// transactional.id는 Producer 인스턴스를 식별
props. put ( "transactional.id" , "my-app-instance-1" );
동작 방식 :
같은 transactional.id로 새 Producer 시작 시 이전 Producer는 “fenced”
미완료 트랜잭션 자동 롤백
Producer 재시작 후에도 트랜잭션 일관성 유지
Producer A (transactional.id = "tx-1")
│
│── beginTransaction
│── send messages...
│ (장애 발생)
│
▼
Producer B (transactional.id = "tx-1") 시작
│
│── initTransactions
│ └── Producer A fenced
│ └── A의 미완료 트랜잭션 abort
│
└── 정상 트랜잭션 시작
Consumer의 Exactly-once 읽기
read_committed 격리 수준
Properties props = new Properties ();
props. put ( "bootstrap.servers" , "localhost:9092" );
props. put ( "group.id" , "exactly-once-consumer" );
// 커밋된 메시지만 읽음
props. put ( "isolation.level" , "read_committed" );
// 수동 커밋 사용
props. put ( "enable.auto.commit" , "false" );
KafkaConsumer< String , String > consumer = new KafkaConsumer<>(props);
isolation.level 비교
설정 동작 사용 시나리오 read_uncommitted모든 메시지 읽음 (기본값) 트랜잭션 미사용 read_committed커밋된 메시지만 읽음 EOS 구현 시
read_committed 동작 방식
Partition:
┌────┬────┬────┬────┬────┬────┬────┬────┐
│ M1 │ M2 │ M3 │ M4 │ M5 │ M6 │ M7 │ M8 │
└────┴────┴────┴────┴────┴────┴────┴────┘
↑ ↑ ↑ ↑
│ │ │ │
Committed In-progress Committed Pending
TX-A TX-B TX-A TX-B
read_uncommitted: M1 ~ M8 모두 읽음
read_committed: M1, M3, M4, M5 읽음 (TX-A만)
TX-B 커밋 전까지 M2, M6~M8 대기
개념
입력 토픽의 메시지를 처리하고 출력 토픽에 쓰는 패턴입니다.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Input Topic │────>│ Processor │────>│Output Topic │
└─────────────┘ └─────────────┘ └─────────────┘
│
│ Exactly-once:
│ - Consumer offset 커밋
│ - Output 메시지 쓰기
│ - 둘 다 같은 트랜잭션
구현 예제
// Producer 설정
Properties producerProps = new Properties ();
producerProps. put ( "bootstrap.servers" , "localhost:9092" );
producerProps. put ( "transactional.id" , "consume-transform-produce-1" );
// Consumer 설정
Properties consumerProps = new Properties ();
consumerProps. put ( "bootstrap.servers" , "localhost:9092" );
consumerProps. put ( "group.id" , "ctp-group" );
consumerProps. put ( "isolation.level" , "read_committed" );
consumerProps. put ( "enable.auto.commit" , "false" );
KafkaProducer< String , String > producer = new KafkaProducer<>(producerProps);
KafkaConsumer< String , String > consumer = new KafkaConsumer<>(consumerProps);
producer. initTransactions ();
consumer. subscribe (Arrays. asList ( "input-topic" ));
while ( true ) {
ConsumerRecords< String , String > records = consumer. poll (Duration. ofMillis ( 100 ));
if ( ! records. isEmpty ()) {
producer. beginTransaction ();
try {
for (ConsumerRecord< String , String > record : records) {
// 변환 처리
String transformedValue = transform (record. value ());
// 출력 토픽에 쓰기
producer. send ( new ProducerRecord<>(
"output-topic" ,
record. key (),
transformedValue
));
}
// Consumer 오프셋을 트랜잭션에 포함
Map< TopicPartition , OffsetAndMetadata > offsets = new HashMap<>();
for (TopicPartition partition : records. partitions ()) {
List<ConsumerRecord< String , String >> partitionRecords =
records. records (partition);
long lastOffset = partitionRecords
. get (partitionRecords. size () - 1 ). offset ();
offsets. put (partition, new OffsetAndMetadata (lastOffset + 1 ));
}
// 오프셋 커밋을 트랜잭션에 포함
producer. sendOffsetsToTransaction (offsets, consumer. groupMetadata ());
// 커밋: 메시지 쓰기 + 오프셋 커밋이 원자적으로 처리
producer. commitTransaction ();
} catch (Exception e ) {
producer. abortTransaction ();
throw e;
}
}
}
Kafka Streams의 EOS
설정 방법
Properties props = new Properties ();
props. put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
props. put (StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-streams" );
// EOS 활성화 (Kafka 2.5+)
props. put (StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Kafka 0.11 ~ 2.4
// props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
// StreamsConfig.EXACTLY_ONCE);
StreamsBuilder builder = new StreamsBuilder ();
// 스트림 처리 토폴로지
builder. < String, String > stream ( "input-topic" )
. mapValues (value -> processValue (value))
. to ( "output-topic" );
KafkaStreams streams = new KafkaStreams (builder. build (), props);
streams. start ();
EXACTLY_ONCE vs EXACTLY_ONCE_V2
특성 EXACTLY_ONCE (v1) EXACTLY_ONCE_V2 도입 버전 Kafka 0.11 Kafka 2.5 트랜잭션 ID 파티션별 태스크별 오버헤드 높음 낮음 확장성 제한적 개선됨 권장 사용 레거시 호환 새 애플리케이션
Kafka Streams EOS 내부 동작
┌────────────────────────────────────────────────────────────────┐
│ Kafka Streams EOS │
├────────────────────────────────────────────────────────────────┤
│ │
│ 1. Consumer가 input-topic에서 메시지 읽음 │
│ ↓ │
│ 2. Processor가 메시지 처리 (상태 저장소 업데이트 포함) │
│ ↓ │
│ 3. Transaction 시작 │
│ - Output 메시지 쓰기 │
│ - 상태 저장소 변경 (changelog topic) │
│ - Consumer 오프셋 커밋 │
│ ↓ │
│ 4. Transaction 커밋 (all or nothing) │
│ │
└────────────────────────────────────────────────────────────────┘
외부 시스템과의 EOS
데이터베이스 연동
public class ExactlyOnceToDatabase {
private final KafkaConsumer< String , String > consumer;
private final DataSource dataSource;
public void process () {
while ( true ) {
ConsumerRecords< String , String > records =
consumer. poll (Duration. ofMillis ( 100 ));
for (ConsumerRecord< String , String > record : records) {
Connection conn = dataSource. getConnection ();
try {
conn. setAutoCommit ( false );
// 1. 비즈니스 데이터 저장
saveBusinessData (conn, record);
// 2. 오프셋 저장 (같은 트랜잭션)
saveOffset (conn, record. topic (),
record. partition (), record. offset ());
conn. commit ();
} catch (Exception e ) {
conn. rollback ();
throw e;
} finally {
conn. close ();
}
}
// Kafka에는 커밋하지 않음 - DB가 source of truth
}
}
// Consumer 재시작 시 DB에서 오프셋 복원
public void initializeOffsets () {
for (TopicPartition partition : consumer. assignment ()) {
Long savedOffset = getOffsetFromDB (
partition. topic (), partition. partition ());
if (savedOffset != null ) {
consumer. seek (partition, savedOffset + 1 );
}
}
}
}
Outbox 패턴
// 1. 비즈니스 로직과 이벤트를 하나의 DB 트랜잭션으로
@ Transactional
public void processOrder (Order order) {
// 비즈니스 데이터 저장
orderRepository. save (order);
// Outbox 테이블에 이벤트 저장 (같은 트랜잭션)
outboxRepository. save ( new OutboxEvent (
"orders" ,
order. getId (),
toJson (order)
));
}
// 2. 별도 프로세스가 Outbox → Kafka 발행
@ Scheduled ( fixedRate = 1000 )
public void publishOutboxEvents () {
List< OutboxEvent > events = outboxRepository. findUnpublished ();
for (OutboxEvent event : events) {
producer. send ( new ProducerRecord<>(
event. getTopic (),
event. getKey (),
event. getPayload ()
)). get ();
event. markAsPublished ();
outboxRepository. save (event);
}
}
성능 고려사항
트랜잭션 오버헤드
Non-transactional:
┌────────────────────────────────────────┐
│ send → ack → send → ack → send → ack │
└────────────────────────────────────────┘
Transactional:
┌──────────────────────────────────────────────────────┐
│ begin → send → send → send → prepare → commit → ack │
│ ↑ │
│ 추가 오버헤드 │
└──────────────────────────────────────────────────────┘
최적화 전략
// 1. 배치 크기 증가
props. put ( "batch.size" , "65536" ); // 64KB
props. put ( "linger.ms" , "10" );
// 2. 트랜잭션당 메시지 수 증가
int batchCount = 0 ;
producer. beginTransaction ();
for (ConsumerRecord< String , String > record : records) {
producer. send (...);
batchCount ++ ;
// 1000개마다 커밋
if (batchCount >= 1000 ) {
producer. commitTransaction ();
producer. beginTransaction ();
batchCount = 0 ;
}
}
producer. commitTransaction ();
성능 비교
모드 상대 처리량 레이턴시 Fire-and-forget 100% 최저 At-least-once 80-90% 낮음 Exactly-once 50-70% 중간
장애 처리
ProducerFencedException
try {
producer. beginTransaction ();
// ...
producer. commitTransaction ();
} catch (ProducerFencedException e ) {
// 같은 transactional.id로 다른 Producer가 시작됨
// 이 Producer는 더 이상 사용할 수 없음
log. error ( "Producer fenced, shutting down" , e);
producer. close ();
System. exit ( 1 );
}
트랜잭션 타임아웃
// 트랜잭션 타임아웃 설정
props. put ( "transaction.timeout.ms" , "60000" ); // 기본 60초
// 타임아웃 초과 시 트랜잭션 자동 abort
// → 다음 beginTransaction에서 실패
관련 문서
댓글 (0)