Offset 관리
Offset은 Kafka에서 Consumer가 메시지를 어디까지 읽었는지 추적하는 핵심 메커니즘입니다.
Offset 개념
정의
Offset은 파티션 내에서 각 메시지의 고유한 순차적 ID입니다.
Partition 0:
┌────┬────┬────┬────┬────┬────┬────┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ ← Offset
└────┴────┴────┴────┴────┴────┴────┘
↑ ↑
Committed Current
Offset Position
Offset 종류
| Offset 종류 | 설명 |
|---|---|
| Log End Offset | 파티션의 마지막 메시지 위치 |
| Current Position | Consumer가 현재 읽고 있는 위치 |
| Committed Offset | Consumer Group이 커밋한 마지막 위치 |
| Consumer Lag | Log End Offset - Committed Offset |
Offset 저장 위치
__consumer_offsets 토픽
Kafka는 Consumer Group의 오프셋을 __consumer_offsets 내부 토픽에 저장합니다.
Key: (group.id, topic, partition)
Value: (offset, metadata, timestamp)
특징:
- 기본 50개 파티션
- Compacted 토픽 (최신 값만 유지)
- 고가용성 보장 (replication factor 설정 가능)
Coordinator를 통한 조회
Consumer → FindCoordinator → Group Coordinator
→ OffsetFetch → __consumer_offsets 파티션 리더
← 커밋된 오프셋 반환
Auto Commit vs Manual Commit
Auto Commit (자동 커밋)
설정
props.put("enable.auto.commit", "true"); // 기본값
props.put("auto.commit.interval.ms", "5000"); // 5초마다 커밋동작 방식
poll() 호출 시 자동으로 이전에 받은 오프셋을 커밋
Timeline:
poll() ─────── process ─────── poll() ─────── process ─────── poll()
↑
auto commit 발생
(이전 poll()의 오프셋)
장점
- 구현 간단
- 코드 복잡도 낮음
단점
- 메시지 중복 가능 (At-least-once)
- 메시지 손실 가능 (처리 전 커밋)
- 정밀한 제어 불가
중복 발생 시나리오
1. poll() → offset 100-200 수신
2. 메시지 처리 중... (offset 150까지 처리)
3. Consumer 크래시!
4. 재시작 → 마지막 커밋 offset 100부터 재처리
5. offset 100-150 중복 처리됨
Manual Commit (수동 커밋)
설정
props.put("enable.auto.commit", "false");Synchronous Commit (동기 커밋)
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// 모든 메시지 처리 후 동기 커밋
consumer.commitSync();
}
} catch (CommitFailedException e) {
log.error("Commit failed", e);
}특징:
- 커밋 완료까지 블로킹
- 커밋 실패 시 예외 발생
- 가장 안전한 방법
Asynchronous Commit (비동기 커밋)
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// 비동기 커밋
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed for offsets: " + offsets, exception);
}
});
}특징:
- 논블로킹
- 재시도 없음 (순서 문제 방지)
- 더 높은 처리량
동기 + 비동기 조합 (권장)
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// 일반적으로 비동기 커밋 (높은 처리량)
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Error in poll loop", e);
} finally {
try {
// 종료 시 동기 커밋 (안전한 종료)
consumer.commitSync();
} finally {
consumer.close();
}
}Commit 방식 비교
| 특성 | Auto Commit | Sync Commit | Async Commit |
|---|---|---|---|
| 구현 난이도 | 쉬움 | 중간 | 중간 |
| 처리량 | 높음 | 낮음 | 높음 |
| 신뢰성 | 낮음 | 높음 | 중간 |
| 중복 가능성 | 있음 | 낮음 | 있음 |
| 재시도 | 없음 | 자동 | 없음 |
특정 Offset 커밋
레코드별 커밋
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
// 현재 레코드의 다음 오프셋을 저장
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1) // 다음에 읽을 오프셋
);
// N개마다 커밋
if (count % 100 == 0) {
consumer.commitAsync(currentOffsets, null);
}
}파티션별 커밋
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
processRecord(record);
}
// 파티션별 마지막 오프셋 + 1 커밋
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
consumer.commitSync(offsets);Offset 탐색 (Seek)
특정 Offset으로 이동
TopicPartition partition = new TopicPartition("my-topic", 0);
// assign 사용 시
consumer.assign(Arrays.asList(partition));
// 특정 오프셋으로 이동
consumer.seek(partition, 100);
// 처음으로 이동
consumer.seekToBeginning(Arrays.asList(partition));
// 끝으로 이동
consumer.seekToEnd(Arrays.asList(partition));타임스탬프 기반 탐색
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(partition, System.currentTimeMillis() - 3600000); // 1시간 전
// 타임스탬프에 해당하는 오프셋 조회
Map<TopicPartition, OffsetAndTimestamp> offsets =
consumer.offsetsForTimes(timestampsToSearch);
// 해당 오프셋으로 이동
OffsetAndTimestamp offsetAndTimestamp = offsets.get(partition);
if (offsetAndTimestamp != null) {
consumer.seek(partition, offsetAndTimestamp.offset());
}활용 사례
1. 장애 복구
// 특정 시점부터 재처리
long failureTime = getFailureTimestamp();
Map<TopicPartition, Long> timestamps = new HashMap<>();
for (TopicPartition tp : consumer.assignment()) {
timestamps.put(tp, failureTime);
}
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
// seek to offsets...2. 데이터 재처리
// 모든 파티션을 처음부터 재처리
consumer.seekToBeginning(consumer.assignment());3. 최신 데이터만 처리
// 현재 시점부터 새로운 메시지만 처리
consumer.seekToEnd(consumer.assignment());Offset 초기화 정책
auto.offset.reset
Consumer Group이 오프셋을 찾지 못할 때의 동작을 정의합니다.
props.put("auto.offset.reset", "latest"); // 기본값| 값 | 동작 |
|---|---|
| latest | 가장 최신 오프셋부터 시작 |
| earliest | 가장 오래된 오프셋부터 시작 |
| none | 오프셋 없으면 예외 발생 |
적용 시나리오
새 Consumer Group 생성 시:
┌─────────────────────────────────────────┐
│ earliest: 처음부터 모든 메시지 처리 │
│ latest: 새 메시지부터 처리 │
└─────────────────────────────────────────┘
기존 Group의 오프셋 만료 시:
┌─────────────────────────────────────────┐
│ offsets.retention.minutes (기본 7일) │
│ 이후 오프셋 삭제됨 → auto.offset.reset │
└─────────────────────────────────────────┘
외부 저장소에 Offset 저장
Exactly-once 처리를 위해 데이터와 오프셋을 함께 저장할 수 있습니다.
데이터베이스 활용 예제
public void processRecords(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) {
try {
// 트랜잭션 시작
database.beginTransaction();
// 데이터 저장
database.saveRecord(record.key(), record.value());
// 오프셋 저장 (같은 트랜잭션)
database.saveOffset(
record.topic(),
record.partition(),
record.offset()
);
// 트랜잭션 커밋
database.commit();
} catch (Exception e) {
database.rollback();
throw e;
}
}
}
// Consumer 시작 시 오프셋 복원
public void initializeConsumer() {
consumer.assign(partitions);
for (TopicPartition partition : partitions) {
Long savedOffset = database.getOffset(partition.topic(), partition.partition());
if (savedOffset != null) {
consumer.seek(partition, savedOffset + 1);
}
}
}Kafka 오프셋 커밋 비활성화
props.put("enable.auto.commit", "false");
// commitSync/commitAsync 호출하지 않음
// 대신 외부 저장소에 오프셋 저장리밸런싱과 Offset
리밸런싱 전 커밋
consumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 리밸런싱 전 현재까지 처리한 오프셋 커밋
log.info("Committing offsets before rebalancing: " + currentOffsets);
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("Partitions assigned: " + partitions);
}
});외부 저장소 사용 시
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 외부 저장소에 오프셋 저장
for (TopicPartition partition : partitions) {
database.saveOffset(partition, currentOffsets.get(partition));
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 외부 저장소에서 오프셋 복원
for (TopicPartition partition : partitions) {
Long offset = database.getOffset(partition);
if (offset != null) {
consumer.seek(partition, offset);
}
}
}Best Practices
1. Manual Commit 사용 (중요 데이터)
props.put("enable.auto.commit", "false");
// 처리 완료 후 커밋
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
consumer.commitSync();2. 적절한 커밋 주기
// 너무 자주: 성능 저하
// 너무 드물게: 중복 처리 증가
int count = 0;
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
if (++count % 100 == 0) { // 100개마다 커밋
consumer.commitAsync();
}
}
consumer.commitSync(); // 마지막 커밋3. 멱등성 처리 구현
// 중복 메시지 방지
public void processRecord(ConsumerRecord<String, String> record) {
String messageId = record.headers().lastHeader("message-id").toString();
if (processedMessages.contains(messageId)) {
log.info("Skipping duplicate message: " + messageId);
return;
}
// 실제 처리
doProcess(record);
processedMessages.add(messageId);
}4. 에러 처리
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// 리밸런싱으로 인한 커밋 실패
log.error("Commit failed due to rebalancing", e);
// 재시도 또는 에러 처리
} catch (RetriableException e) {
// 일시적 오류 - 재시도 가능
log.warn("Retriable error, will retry", e);
}5. Offset 모니터링
// 현재 위치 확인
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumer.assignment());
for (TopicPartition partition : consumer.assignment()) {
long currentPosition = consumer.position(partition);
long endOffset = endOffsets.get(partition);
long lag = endOffset - currentPosition;
log.info("Partition {}: position={}, end={}, lag={}",
partition, currentPosition, endOffset, lag);
}
댓글 (0)