Offset 관리

Offset은 Kafka에서 Consumer가 메시지를 어디까지 읽었는지 추적하는 핵심 메커니즘입니다.

Offset 개념

정의

Offset은 파티션 내에서 각 메시지의 고유한 순차적 ID입니다.

Partition 0:
┌────┬────┬────┬────┬────┬────┬────┐
│ 0  │ 1  │ 2  │ 3  │ 4  │ 5  │ 6  │  ← Offset
└────┴────┴────┴────┴────┴────┴────┘
              ↑           ↑
         Committed    Current
          Offset      Position

Offset 종류

Offset 종류설명
Log End Offset파티션의 마지막 메시지 위치
Current PositionConsumer가 현재 읽고 있는 위치
Committed OffsetConsumer Group이 커밋한 마지막 위치
Consumer LagLog End Offset - Committed Offset

Offset 저장 위치

__consumer_offsets 토픽

Kafka는 Consumer Group의 오프셋을 __consumer_offsets 내부 토픽에 저장합니다.

Key: (group.id, topic, partition)
Value: (offset, metadata, timestamp)

특징:

  • 기본 50개 파티션
  • Compacted 토픽 (최신 값만 유지)
  • 고가용성 보장 (replication factor 설정 가능)

Coordinator를 통한 조회

Consumer → FindCoordinator → Group Coordinator
         → OffsetFetch → __consumer_offsets 파티션 리더
         ← 커밋된 오프셋 반환

Auto Commit vs Manual Commit

Auto Commit (자동 커밋)

설정

props.put("enable.auto.commit", "true");  // 기본값
props.put("auto.commit.interval.ms", "5000");  // 5초마다 커밋

동작 방식

poll() 호출 시 자동으로 이전에 받은 오프셋을 커밋

Timeline:
poll() ─────── process ─────── poll() ─────── process ─────── poll()
                                 ↑
                          auto commit 발생
                      (이전 poll()의 오프셋)

장점

  • 구현 간단
  • 코드 복잡도 낮음

단점

  • 메시지 중복 가능 (At-least-once)
  • 메시지 손실 가능 (처리 전 커밋)
  • 정밀한 제어 불가

중복 발생 시나리오

1. poll() → offset 100-200 수신
2. 메시지 처리 중... (offset 150까지 처리)
3. Consumer 크래시!
4. 재시작 → 마지막 커밋 offset 100부터 재처리
5. offset 100-150 중복 처리됨

Manual Commit (수동 커밋)

설정

props.put("enable.auto.commit", "false");

Synchronous Commit (동기 커밋)

try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
        for (ConsumerRecord<String, String> record : records) {
            processRecord(record);
        }
 
        // 모든 메시지 처리 후 동기 커밋
        consumer.commitSync();
    }
} catch (CommitFailedException e) {
    log.error("Commit failed", e);
}

특징:

  • 커밋 완료까지 블로킹
  • 커밋 실패 시 예외 발생
  • 가장 안전한 방법

Asynchronous Commit (비동기 커밋)

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
 
    // 비동기 커밋
    consumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            log.error("Commit failed for offsets: " + offsets, exception);
        }
    });
}

특징:

  • 논블로킹
  • 재시도 없음 (순서 문제 방지)
  • 더 높은 처리량

동기 + 비동기 조합 (권장)

try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
        for (ConsumerRecord<String, String> record : records) {
            processRecord(record);
        }
 
        // 일반적으로 비동기 커밋 (높은 처리량)
        consumer.commitAsync();
    }
} catch (Exception e) {
    log.error("Error in poll loop", e);
} finally {
    try {
        // 종료 시 동기 커밋 (안전한 종료)
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

Commit 방식 비교

특성Auto CommitSync CommitAsync Commit
구현 난이도쉬움중간중간
처리량높음낮음높음
신뢰성낮음높음중간
중복 가능성있음낮음있음
재시도없음자동없음

특정 Offset 커밋

레코드별 커밋

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
 
for (ConsumerRecord<String, String> record : records) {
    processRecord(record);
 
    // 현재 레코드의 다음 오프셋을 저장
    currentOffsets.put(
        new TopicPartition(record.topic(), record.partition()),
        new OffsetAndMetadata(record.offset() + 1)  // 다음에 읽을 오프셋
    );
 
    // N개마다 커밋
    if (count % 100 == 0) {
        consumer.commitAsync(currentOffsets, null);
    }
}

파티션별 커밋

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
 
    for (ConsumerRecord<String, String> record : partitionRecords) {
        processRecord(record);
    }
 
    // 파티션별 마지막 오프셋 + 1 커밋
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
 
consumer.commitSync(offsets);

Offset 탐색 (Seek)

특정 Offset으로 이동

TopicPartition partition = new TopicPartition("my-topic", 0);
 
// assign 사용 시
consumer.assign(Arrays.asList(partition));
 
// 특정 오프셋으로 이동
consumer.seek(partition, 100);
 
// 처음으로 이동
consumer.seekToBeginning(Arrays.asList(partition));
 
// 끝으로 이동
consumer.seekToEnd(Arrays.asList(partition));

타임스탬프 기반 탐색

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(partition, System.currentTimeMillis() - 3600000);  // 1시간 전
 
// 타임스탬프에 해당하는 오프셋 조회
Map<TopicPartition, OffsetAndTimestamp> offsets =
    consumer.offsetsForTimes(timestampsToSearch);
 
// 해당 오프셋으로 이동
OffsetAndTimestamp offsetAndTimestamp = offsets.get(partition);
if (offsetAndTimestamp != null) {
    consumer.seek(partition, offsetAndTimestamp.offset());
}

활용 사례

1. 장애 복구

// 특정 시점부터 재처리
long failureTime = getFailureTimestamp();
Map<TopicPartition, Long> timestamps = new HashMap<>();
for (TopicPartition tp : consumer.assignment()) {
    timestamps.put(tp, failureTime);
}
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
// seek to offsets...

2. 데이터 재처리

// 모든 파티션을 처음부터 재처리
consumer.seekToBeginning(consumer.assignment());

3. 최신 데이터만 처리

// 현재 시점부터 새로운 메시지만 처리
consumer.seekToEnd(consumer.assignment());

Offset 초기화 정책

auto.offset.reset

Consumer Group이 오프셋을 찾지 못할 때의 동작을 정의합니다.

props.put("auto.offset.reset", "latest");  // 기본값
동작
latest가장 최신 오프셋부터 시작
earliest가장 오래된 오프셋부터 시작
none오프셋 없으면 예외 발생

적용 시나리오

새 Consumer Group 생성 시:
┌─────────────────────────────────────────┐
│ earliest: 처음부터 모든 메시지 처리      │
│ latest: 새 메시지부터 처리              │
└─────────────────────────────────────────┘

기존 Group의 오프셋 만료 시:
┌─────────────────────────────────────────┐
│ offsets.retention.minutes (기본 7일)    │
│ 이후 오프셋 삭제됨 → auto.offset.reset  │
└─────────────────────────────────────────┘

외부 저장소에 Offset 저장

Exactly-once 처리를 위해 데이터와 오프셋을 함께 저장할 수 있습니다.

데이터베이스 활용 예제

public void processRecords(ConsumerRecords<String, String> records) {
    for (ConsumerRecord<String, String> record : records) {
        try {
            // 트랜잭션 시작
            database.beginTransaction();
 
            // 데이터 저장
            database.saveRecord(record.key(), record.value());
 
            // 오프셋 저장 (같은 트랜잭션)
            database.saveOffset(
                record.topic(),
                record.partition(),
                record.offset()
            );
 
            // 트랜잭션 커밋
            database.commit();
 
        } catch (Exception e) {
            database.rollback();
            throw e;
        }
    }
}
 
// Consumer 시작 시 오프셋 복원
public void initializeConsumer() {
    consumer.assign(partitions);
 
    for (TopicPartition partition : partitions) {
        Long savedOffset = database.getOffset(partition.topic(), partition.partition());
        if (savedOffset != null) {
            consumer.seek(partition, savedOffset + 1);
        }
    }
}

Kafka 오프셋 커밋 비활성화

props.put("enable.auto.commit", "false");
// commitSync/commitAsync 호출하지 않음
// 대신 외부 저장소에 오프셋 저장

리밸런싱과 Offset

리밸런싱 전 커밋

consumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
 
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 리밸런싱 전 현재까지 처리한 오프셋 커밋
        log.info("Committing offsets before rebalancing: " + currentOffsets);
        consumer.commitSync(currentOffsets);
        currentOffsets.clear();
    }
 
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        log.info("Partitions assigned: " + partitions);
    }
});

외부 저장소 사용 시

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // 외부 저장소에 오프셋 저장
    for (TopicPartition partition : partitions) {
        database.saveOffset(partition, currentOffsets.get(partition));
    }
}
 
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    // 외부 저장소에서 오프셋 복원
    for (TopicPartition partition : partitions) {
        Long offset = database.getOffset(partition);
        if (offset != null) {
            consumer.seek(partition, offset);
        }
    }
}

Best Practices

1. Manual Commit 사용 (중요 데이터)

props.put("enable.auto.commit", "false");
 
// 처리 완료 후 커밋
for (ConsumerRecord<String, String> record : records) {
    processRecord(record);
}
consumer.commitSync();

2. 적절한 커밋 주기

// 너무 자주: 성능 저하
// 너무 드물게: 중복 처리 증가
 
int count = 0;
for (ConsumerRecord<String, String> record : records) {
    processRecord(record);
    if (++count % 100 == 0) {  // 100개마다 커밋
        consumer.commitAsync();
    }
}
consumer.commitSync();  // 마지막 커밋

3. 멱등성 처리 구현

// 중복 메시지 방지
public void processRecord(ConsumerRecord<String, String> record) {
    String messageId = record.headers().lastHeader("message-id").toString();
 
    if (processedMessages.contains(messageId)) {
        log.info("Skipping duplicate message: " + messageId);
        return;
    }
 
    // 실제 처리
    doProcess(record);
    processedMessages.add(messageId);
}

4. 에러 처리

try {
    consumer.commitSync();
} catch (CommitFailedException e) {
    // 리밸런싱으로 인한 커밋 실패
    log.error("Commit failed due to rebalancing", e);
    // 재시도 또는 에러 처리
} catch (RetriableException e) {
    // 일시적 오류 - 재시도 가능
    log.warn("Retriable error, will retry", e);
}

5. Offset 모니터링

// 현재 위치 확인
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumer.assignment());
 
for (TopicPartition partition : consumer.assignment()) {
    long currentPosition = consumer.position(partition);
    long endOffset = endOffsets.get(partition);
    long lag = endOffset - currentPosition;
 
    log.info("Partition {}: position={}, end={}, lag={}",
        partition, currentPosition, endOffset, lag);
}

관련 문서