Dead Letter Queue 패턴
처리 실패한 메시지를 별도 토픽으로 분리하여 관리하는 패턴입니다.
DLQ 개요
개념
┌─────────────────────────────────────────────────────────────────┐
│ Dead Letter Queue Pattern │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Topic: orders │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Consumer Application │ │
│ │ ┌───────────────────────────────────────────┐ │ │
│ │ │ Message Processing │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │ 성공 │ │ 실패 │ │ │ │
│ │ │ │ 처리 │ │ (재시도 │ │ │ │
│ │ │ │ │ │ 실패) │ │ │ │
│ │ │ └────┬────┘ └────┬────┘ │ │ │
│ │ │ │ │ │ │ │
│ │ │ ▼ ▼ │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │ DB │ │ DLQ │ │ │ │
│ │ │ │ 저장 │ │ Topic │ │ │ │
│ │ │ └─────────┘ └─────────┘ │ │ │
│ │ └───────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ Topic: orders-dlq │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ DLQ Processor │ → 수동 검토, 재처리, 알림 │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
DLQ가 필요한 경우
| 상황 | 예시 | DLQ 필요성 |
|---|---|---|
| 영구적 실패 | 잘못된 데이터 형식, 비즈니스 규칙 위반 | 필수 |
| 재시도 한계 초과 | 외부 서비스 장기 장애 | 필수 |
| 파싱 오류 | 손상된 메시지, 스키마 불일치 | 필수 |
| 일시적 오류 | 네트워크 타임아웃, DB 연결 실패 | 재시도 후 |
DLQ 구현
기본 구현
public class DLQConsumer {
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> dlqProducer;
private static final int MAX_RETRIES = 3;
private static final String DLQ_TOPIC_SUFFIX = "-dlq";
public void consume(String topic) {
consumer.subscribe(Collections.singletonList(topic));
String dlqTopic = topic + DLQ_TOPIC_SUFFIX;
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processWithRetry(record);
} catch (Exception e) {
sendToDLQ(record, e, dlqTopic);
}
}
consumer.commitSync();
}
}
private void processWithRetry(ConsumerRecord<String, String> record) throws Exception {
int retries = 0;
Exception lastException = null;
while (retries < MAX_RETRIES) {
try {
process(record);
return; // 성공
} catch (RetryableException e) {
lastException = e;
retries++;
Thread.sleep(calculateBackoff(retries));
} catch (NonRetryableException e) {
throw e; // 재시도 없이 DLQ로
}
}
throw lastException; // 재시도 한계 초과
}
private long calculateBackoff(int retries) {
return (long) Math.pow(2, retries) * 1000; // 지수 백오프
}
}DLQ 메시지 전송
private void sendToDLQ(ConsumerRecord<String, String> record,
Exception error,
String dlqTopic) {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
dlqTopic,
record.key(),
record.value()
);
// 원본 메타데이터 추가
Headers headers = dlqRecord.headers();
headers.add("dlq.original.topic", record.topic().getBytes(StandardCharsets.UTF_8));
headers.add("dlq.original.partition", String.valueOf(record.partition()).getBytes());
headers.add("dlq.original.offset", String.valueOf(record.offset()).getBytes());
headers.add("dlq.original.timestamp", String.valueOf(record.timestamp()).getBytes());
// 에러 정보 추가
headers.add("dlq.error.message", error.getMessage().getBytes(StandardCharsets.UTF_8));
headers.add("dlq.error.class", error.getClass().getName().getBytes(StandardCharsets.UTF_8));
headers.add("dlq.error.timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
headers.add("dlq.retry.count", String.valueOf(MAX_RETRIES).getBytes());
// 스택 트레이스 (선택적)
StringWriter sw = new StringWriter();
error.printStackTrace(new PrintWriter(sw));
headers.add("dlq.error.stacktrace", sw.toString().getBytes(StandardCharsets.UTF_8));
dlqProducer.send(dlqRecord, (metadata, exception) -> {
if (exception != null) {
log.error("Failed to send to DLQ: {}", exception.getMessage());
// DLQ 전송 실패 시 추가 처리 필요
}
});
}예외 분류
// 재시도 가능한 예외
public class RetryableException extends RuntimeException {
public RetryableException(String message, Throwable cause) {
super(message, cause);
}
}
// 재시도 불가능한 예외
public class NonRetryableException extends RuntimeException {
public NonRetryableException(String message, Throwable cause) {
super(message, cause);
}
}
// 예외 분류기
public class ExceptionClassifier {
public boolean isRetryable(Exception e) {
if (e instanceof SocketTimeoutException) return true;
if (e instanceof SQLTransientException) return true;
if (e instanceof ConnectException) return true;
if (e instanceof ParseException) return false;
if (e instanceof ValidationException) return false;
if (e instanceof NullPointerException) return false;
return false; // 기본: 재시도 안함
}
}DLQ 토픽 설계
네이밍 컨벤션
원본 토픽: orders
DLQ 토픽: orders-dlq
원본 토픽: user-events
DLQ 토픽: user-events-dlq
또는 중앙 DLQ:
dlq.orders
dlq.user-events
DLQ 토픽 설정
# DLQ 토픽 생성
kafka-topics.sh --create \
--topic orders-dlq \
--partitions 3 \
--replication-factor 3 \
--config retention.ms=604800000 \ # 7일 보관
--config cleanup.policy=delete \
--bootstrap-server localhost:9092권장 설정
# 원본 토픽보다 적은 파티션 (처리량 낮음)
partitions=3
# 높은 내구성
replication.factor=3
min.insync.replicas=2
# 긴 보관 기간 (검토 시간 확보)
retention.ms=604800000 # 7일 또는 더 길게
# 압축 (Compaction 아님)
cleanup.policy=deleteDLQ 메시지 구조
헤더 스키마
public class DLQHeaders {
// 원본 정보
public static final String ORIGINAL_TOPIC = "dlq.original.topic";
public static final String ORIGINAL_PARTITION = "dlq.original.partition";
public static final String ORIGINAL_OFFSET = "dlq.original.offset";
public static final String ORIGINAL_TIMESTAMP = "dlq.original.timestamp";
public static final String ORIGINAL_KEY = "dlq.original.key";
// 에러 정보
public static final String ERROR_MESSAGE = "dlq.error.message";
public static final String ERROR_CLASS = "dlq.error.class";
public static final String ERROR_STACKTRACE = "dlq.error.stacktrace";
public static final String ERROR_TIMESTAMP = "dlq.error.timestamp";
// 처리 정보
public static final String RETRY_COUNT = "dlq.retry.count";
public static final String CONSUMER_GROUP = "dlq.consumer.group";
public static final String CONSUMER_ID = "dlq.consumer.id";
}Wrapper 메시지 (대안)
public class DLQMessage {
// 원본 메시지
private String originalTopic;
private int originalPartition;
private long originalOffset;
private long originalTimestamp;
private String originalKey;
private String originalValue;
// 에러 정보
private String errorMessage;
private String errorClass;
private String errorStacktrace;
private long errorTimestamp;
// 처리 정보
private int retryCount;
private String consumerGroup;
private String consumerId;
// 재처리 지원
private int reprocessCount;
private String lastReprocessError;
}DLQ 처리
DLQ Processor
public class DLQProcessor {
private final KafkaConsumer<String, String> dlqConsumer;
private final KafkaProducer<String, String> producer;
public void processDLQ(String dlqTopic) {
dlqConsumer.subscribe(Collections.singletonList(dlqTopic));
while (running) {
ConsumerRecords<String, String> records = dlqConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
DLQMessage dlqMessage = parseDLQMessage(record);
// 1. 에러 분석
ErrorAnalysis analysis = analyzeError(dlqMessage);
// 2. 처리 결정
switch (analysis.getAction()) {
case REPROCESS:
reprocessMessage(dlqMessage);
break;
case ALERT:
sendAlert(dlqMessage);
break;
case DISCARD:
logAndDiscard(dlqMessage);
break;
case MANUAL_REVIEW:
markForManualReview(dlqMessage);
break;
}
}
dlqConsumer.commitSync();
}
}
private void reprocessMessage(DLQMessage message) {
// 원본 토픽으로 재전송
ProducerRecord<String, String> record = new ProducerRecord<>(
message.getOriginalTopic(),
message.getOriginalKey(),
message.getOriginalValue()
);
// 재처리 횟수 기록
record.headers().add("reprocess.count",
String.valueOf(message.getReprocessCount() + 1).getBytes());
producer.send(record);
}
}자동 재처리 스케줄러
@Scheduled(fixedDelay = 3600000) // 1시간마다
public void scheduleReprocess() {
List<ConsumerRecord<String, String>> candidates = findReprocessCandidates();
for (ConsumerRecord<String, String> record : candidates) {
DLQMessage message = parseDLQMessage(record);
// 재처리 조건 확인
if (isEligibleForReprocess(message)) {
reprocessMessage(message);
}
}
}
private boolean isEligibleForReprocess(DLQMessage message) {
// 재처리 횟수 제한
if (message.getReprocessCount() >= 3) return false;
// 에러 유형 확인
if (isPermanentError(message.getErrorClass())) return false;
// 시간 경과 확인 (일정 시간 후 재시도)
long elapsed = System.currentTimeMillis() - message.getErrorTimestamp();
return elapsed >= getBackoffTime(message.getReprocessCount());
}모니터링
메트릭
// Micrometer 메트릭
private final Counter dlqMessagesTotal;
private final Counter dlqReprocessSuccessTotal;
private final Counter dlqReprocessFailureTotal;
private final Gauge dlqPendingGauge;
public void recordDLQMetrics() {
dlqMessagesTotal.increment();
// DLQ 대기 메시지 수
long pendingCount = getDLQMessageCount();
dlqPendingGauge.set(pendingCount);
}알림 설정
# Prometheus Alert Rules
groups:
- name: kafka-dlq-alerts
rules:
- alert: HighDLQRate
expr: rate(dlq_messages_total[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "High DLQ message rate"
description: "DLQ receiving {{ $value }} messages/s"
- alert: DLQBacklogHigh
expr: dlq_pending_messages > 1000
for: 15m
labels:
severity: warning
annotations:
summary: "DLQ backlog is growing"
- alert: DLQReprocessFailures
expr: rate(dlq_reprocess_failure_total[1h]) > 0
for: 1h
labels:
severity: info
annotations:
summary: "DLQ reprocess failures detected"대시보드
DLQ Dashboard:
┌─────────────────────────────────────────────────────────────────┐
│ DLQ Messages Rate │ DLQ Backlog │
│ ┌────────────────────────┐ │ ┌────────────────────────┐ │
│ │ ▲ │ │ │ │ │
│ │ /│\ /\ │ │ │ 1,234 messages │ │
│ │ / │ \ / \ │ │ │ pending │ │
│ └────────────────────────┘ │ └────────────────────────┘ │
├─────────────────────────────┼───────────────────────────────────┤
│ Error Types │ Reprocess Status │
│ ┌────────────────────────┐ │ ┌────────────────────────┐ │
│ │ Validation: 45% │ │ │ Success: 78% │ │
│ │ Timeout: 30% │ │ │ Failed: 12% │ │
│ │ Parse: 15% │ │ │ Pending: 10% │ │
│ │ Other: 10% │ │ │ │ │
│ └────────────────────────┘ │ └────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
운영 절차
DLQ 검토 프로세스
1. 일일 검토
- DLQ 메시지 수 확인
- 에러 유형 분류
- 재처리 가능 여부 판단
2. 원인 분석
- 반복 패턴 식별
- 근본 원인 추적
- 시스템 개선 필요 여부
3. 처리 결정
- 자동 재처리 설정
- 수동 수정 후 재처리
- 폐기 (로그 남김)
4. 사후 조치
- 코드 수정
- 검증 로직 추가
- 모니터링 강화
수동 재처리 도구
#!/bin/bash
# dlq-reprocess.sh
DLQ_TOPIC=$1
ORIGINAL_TOPIC=$2
MAX_MESSAGES=${3:-100}
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic $DLQ_TOPIC \
--max-messages $MAX_MESSAGES \
--from-beginning \
| kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic $ORIGINAL_TOPICBest Practices
1. DLQ 정책 수립
dlq_policy:
retry:
max_attempts: 3
backoff_multiplier: 2
initial_delay_ms: 1000
reprocess:
enabled: true
max_attempts: 3
schedule: "0 0 * * *" # 매일 자정
retention:
days: 30
archive_enabled: true
alerting:
high_rate_threshold: 10/min
backlog_threshold: 10002. 에러 분류 체계
public enum ErrorCategory {
TRANSIENT, // 일시적 - 자동 재시도
DATA_ERROR, // 데이터 오류 - 수정 후 재처리
SYSTEM_ERROR, // 시스템 오류 - 수정 배포 후 재처리
PERMANENT // 영구적 - 폐기 또는 수동 처리
}3. DLQ 토픽 관리
✓ 원본 토픽당 전용 DLQ
✓ 적절한 보관 기간
✓ 정기적인 정리 작업
✓ 접근 권한 제어
4. 문서화
## DLQ 운영 가이드
### 에러 유형별 대응
| 에러 | 원인 | 대응 |
|------|------|------|
| ValidationException | 잘못된 데이터 | 데이터 수정 후 재처리 |
| TimeoutException | 외부 서비스 지연 | 자동 재처리 대기 |
| ParseException | 스키마 불일치 | 스키마 확인 및 수정 |
### 재처리 절차
1. DLQ 메시지 확인
2. 원인 분석
3. 필요시 수정
4. 재처리 실행
5. 결과 확인
댓글 (0)