Dead Letter Queue 패턴

처리 실패한 메시지를 별도 토픽으로 분리하여 관리하는 패턴입니다.

DLQ 개요

개념

┌─────────────────────────────────────────────────────────────────┐
│                    Dead Letter Queue Pattern                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Topic: orders                                                  │
│       │                                                         │
│       ▼                                                         │
│  ┌─────────────────────────────────────────────────┐           │
│  │              Consumer Application                │           │
│  │  ┌───────────────────────────────────────────┐  │           │
│  │  │           Message Processing              │  │           │
│  │  │                                           │  │           │
│  │  │  ┌─────────┐     ┌─────────┐             │  │           │
│  │  │  │ 성공    │     │  실패   │             │  │           │
│  │  │  │ 처리    │     │ (재시도 │             │  │           │
│  │  │  │         │     │  실패)  │             │  │           │
│  │  │  └────┬────┘     └────┬────┘             │  │           │
│  │  │       │               │                   │  │           │
│  │  │       ▼               ▼                   │  │           │
│  │  │  ┌─────────┐    ┌─────────┐              │  │           │
│  │  │  │   DB    │    │  DLQ    │              │  │           │
│  │  │  │ 저장    │    │ Topic   │              │  │           │
│  │  │  └─────────┘    └─────────┘              │  │           │
│  │  └───────────────────────────────────────────┘  │           │
│  └─────────────────────────────────────────────────┘           │
│                                                                 │
│  Topic: orders-dlq                                              │
│       │                                                         │
│       ▼                                                         │
│  ┌─────────────────┐                                           │
│  │ DLQ Processor   │ → 수동 검토, 재처리, 알림                   │
│  └─────────────────┘                                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

DLQ가 필요한 경우

상황예시DLQ 필요성
영구적 실패잘못된 데이터 형식, 비즈니스 규칙 위반필수
재시도 한계 초과외부 서비스 장기 장애필수
파싱 오류손상된 메시지, 스키마 불일치필수
일시적 오류네트워크 타임아웃, DB 연결 실패재시도 후

DLQ 구현

기본 구현

public class DLQConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final KafkaProducer<String, String> dlqProducer;
    private static final int MAX_RETRIES = 3;
    private static final String DLQ_TOPIC_SUFFIX = "-dlq";
 
    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));
        String dlqTopic = topic + DLQ_TOPIC_SUFFIX;
 
        while (running) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
            for (ConsumerRecord<String, String> record : records) {
                try {
                    processWithRetry(record);
                } catch (Exception e) {
                    sendToDLQ(record, e, dlqTopic);
                }
            }
            consumer.commitSync();
        }
    }
 
    private void processWithRetry(ConsumerRecord<String, String> record) throws Exception {
        int retries = 0;
        Exception lastException = null;
 
        while (retries < MAX_RETRIES) {
            try {
                process(record);
                return;  // 성공
            } catch (RetryableException e) {
                lastException = e;
                retries++;
                Thread.sleep(calculateBackoff(retries));
            } catch (NonRetryableException e) {
                throw e;  // 재시도 없이 DLQ로
            }
        }
        throw lastException;  // 재시도 한계 초과
    }
 
    private long calculateBackoff(int retries) {
        return (long) Math.pow(2, retries) * 1000;  // 지수 백오프
    }
}

DLQ 메시지 전송

private void sendToDLQ(ConsumerRecord<String, String> record,
                       Exception error,
                       String dlqTopic) {
    ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
        dlqTopic,
        record.key(),
        record.value()
    );
 
    // 원본 메타데이터 추가
    Headers headers = dlqRecord.headers();
    headers.add("dlq.original.topic", record.topic().getBytes(StandardCharsets.UTF_8));
    headers.add("dlq.original.partition", String.valueOf(record.partition()).getBytes());
    headers.add("dlq.original.offset", String.valueOf(record.offset()).getBytes());
    headers.add("dlq.original.timestamp", String.valueOf(record.timestamp()).getBytes());
 
    // 에러 정보 추가
    headers.add("dlq.error.message", error.getMessage().getBytes(StandardCharsets.UTF_8));
    headers.add("dlq.error.class", error.getClass().getName().getBytes(StandardCharsets.UTF_8));
    headers.add("dlq.error.timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
    headers.add("dlq.retry.count", String.valueOf(MAX_RETRIES).getBytes());
 
    // 스택 트레이스 (선택적)
    StringWriter sw = new StringWriter();
    error.printStackTrace(new PrintWriter(sw));
    headers.add("dlq.error.stacktrace", sw.toString().getBytes(StandardCharsets.UTF_8));
 
    dlqProducer.send(dlqRecord, (metadata, exception) -> {
        if (exception != null) {
            log.error("Failed to send to DLQ: {}", exception.getMessage());
            // DLQ 전송 실패 시 추가 처리 필요
        }
    });
}

예외 분류

// 재시도 가능한 예외
public class RetryableException extends RuntimeException {
    public RetryableException(String message, Throwable cause) {
        super(message, cause);
    }
}
 
// 재시도 불가능한 예외
public class NonRetryableException extends RuntimeException {
    public NonRetryableException(String message, Throwable cause) {
        super(message, cause);
    }
}
 
// 예외 분류기
public class ExceptionClassifier {
    public boolean isRetryable(Exception e) {
        if (e instanceof SocketTimeoutException) return true;
        if (e instanceof SQLTransientException) return true;
        if (e instanceof ConnectException) return true;
 
        if (e instanceof ParseException) return false;
        if (e instanceof ValidationException) return false;
        if (e instanceof NullPointerException) return false;
 
        return false;  // 기본: 재시도 안함
    }
}

DLQ 토픽 설계

네이밍 컨벤션

원본 토픽: orders
DLQ 토픽: orders-dlq

원본 토픽: user-events
DLQ 토픽: user-events-dlq

또는 중앙 DLQ:
dlq.orders
dlq.user-events

DLQ 토픽 설정

# DLQ 토픽 생성
kafka-topics.sh --create \
    --topic orders-dlq \
    --partitions 3 \
    --replication-factor 3 \
    --config retention.ms=604800000 \    # 7일 보관
    --config cleanup.policy=delete \
    --bootstrap-server localhost:9092

권장 설정

# 원본 토픽보다 적은 파티션 (처리량 낮음)
partitions=3
 
# 높은 내구성
replication.factor=3
min.insync.replicas=2
 
# 긴 보관 기간 (검토 시간 확보)
retention.ms=604800000  # 7일 또는 더 길게
 
# 압축 (Compaction 아님)
cleanup.policy=delete

DLQ 메시지 구조

헤더 스키마

public class DLQHeaders {
    // 원본 정보
    public static final String ORIGINAL_TOPIC = "dlq.original.topic";
    public static final String ORIGINAL_PARTITION = "dlq.original.partition";
    public static final String ORIGINAL_OFFSET = "dlq.original.offset";
    public static final String ORIGINAL_TIMESTAMP = "dlq.original.timestamp";
    public static final String ORIGINAL_KEY = "dlq.original.key";
 
    // 에러 정보
    public static final String ERROR_MESSAGE = "dlq.error.message";
    public static final String ERROR_CLASS = "dlq.error.class";
    public static final String ERROR_STACKTRACE = "dlq.error.stacktrace";
    public static final String ERROR_TIMESTAMP = "dlq.error.timestamp";
 
    // 처리 정보
    public static final String RETRY_COUNT = "dlq.retry.count";
    public static final String CONSUMER_GROUP = "dlq.consumer.group";
    public static final String CONSUMER_ID = "dlq.consumer.id";
}

Wrapper 메시지 (대안)

public class DLQMessage {
    // 원본 메시지
    private String originalTopic;
    private int originalPartition;
    private long originalOffset;
    private long originalTimestamp;
    private String originalKey;
    private String originalValue;
 
    // 에러 정보
    private String errorMessage;
    private String errorClass;
    private String errorStacktrace;
    private long errorTimestamp;
 
    // 처리 정보
    private int retryCount;
    private String consumerGroup;
    private String consumerId;
 
    // 재처리 지원
    private int reprocessCount;
    private String lastReprocessError;
}

DLQ 처리

DLQ Processor

public class DLQProcessor {
    private final KafkaConsumer<String, String> dlqConsumer;
    private final KafkaProducer<String, String> producer;
 
    public void processDLQ(String dlqTopic) {
        dlqConsumer.subscribe(Collections.singletonList(dlqTopic));
 
        while (running) {
            ConsumerRecords<String, String> records = dlqConsumer.poll(Duration.ofMillis(100));
 
            for (ConsumerRecord<String, String> record : records) {
                DLQMessage dlqMessage = parseDLQMessage(record);
 
                // 1. 에러 분석
                ErrorAnalysis analysis = analyzeError(dlqMessage);
 
                // 2. 처리 결정
                switch (analysis.getAction()) {
                    case REPROCESS:
                        reprocessMessage(dlqMessage);
                        break;
                    case ALERT:
                        sendAlert(dlqMessage);
                        break;
                    case DISCARD:
                        logAndDiscard(dlqMessage);
                        break;
                    case MANUAL_REVIEW:
                        markForManualReview(dlqMessage);
                        break;
                }
            }
            dlqConsumer.commitSync();
        }
    }
 
    private void reprocessMessage(DLQMessage message) {
        // 원본 토픽으로 재전송
        ProducerRecord<String, String> record = new ProducerRecord<>(
            message.getOriginalTopic(),
            message.getOriginalKey(),
            message.getOriginalValue()
        );
 
        // 재처리 횟수 기록
        record.headers().add("reprocess.count",
            String.valueOf(message.getReprocessCount() + 1).getBytes());
 
        producer.send(record);
    }
}

자동 재처리 스케줄러

@Scheduled(fixedDelay = 3600000)  // 1시간마다
public void scheduleReprocess() {
    List<ConsumerRecord<String, String>> candidates = findReprocessCandidates();
 
    for (ConsumerRecord<String, String> record : candidates) {
        DLQMessage message = parseDLQMessage(record);
 
        // 재처리 조건 확인
        if (isEligibleForReprocess(message)) {
            reprocessMessage(message);
        }
    }
}
 
private boolean isEligibleForReprocess(DLQMessage message) {
    // 재처리 횟수 제한
    if (message.getReprocessCount() >= 3) return false;
 
    // 에러 유형 확인
    if (isPermanentError(message.getErrorClass())) return false;
 
    // 시간 경과 확인 (일정 시간 후 재시도)
    long elapsed = System.currentTimeMillis() - message.getErrorTimestamp();
    return elapsed >= getBackoffTime(message.getReprocessCount());
}

모니터링

메트릭

// Micrometer 메트릭
private final Counter dlqMessagesTotal;
private final Counter dlqReprocessSuccessTotal;
private final Counter dlqReprocessFailureTotal;
private final Gauge dlqPendingGauge;
 
public void recordDLQMetrics() {
    dlqMessagesTotal.increment();
 
    // DLQ 대기 메시지 수
    long pendingCount = getDLQMessageCount();
    dlqPendingGauge.set(pendingCount);
}

알림 설정

# Prometheus Alert Rules
groups:
  - name: kafka-dlq-alerts
    rules:
      - alert: HighDLQRate
        expr: rate(dlq_messages_total[5m]) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High DLQ message rate"
          description: "DLQ receiving {{ $value }} messages/s"
 
      - alert: DLQBacklogHigh
        expr: dlq_pending_messages > 1000
        for: 15m
        labels:
          severity: warning
        annotations:
          summary: "DLQ backlog is growing"
 
      - alert: DLQReprocessFailures
        expr: rate(dlq_reprocess_failure_total[1h]) > 0
        for: 1h
        labels:
          severity: info
        annotations:
          summary: "DLQ reprocess failures detected"

대시보드

DLQ Dashboard:
┌─────────────────────────────────────────────────────────────────┐
│  DLQ Messages Rate          │  DLQ Backlog                      │
│  ┌────────────────────────┐ │  ┌────────────────────────┐      │
│  │    ▲                   │ │  │                        │      │
│  │   /│\    /\            │ │  │  1,234 messages        │      │
│  │  / │ \  /  \           │ │  │  pending               │      │
│  └────────────────────────┘ │  └────────────────────────┘      │
├─────────────────────────────┼───────────────────────────────────┤
│  Error Types                │  Reprocess Status                 │
│  ┌────────────────────────┐ │  ┌────────────────────────┐      │
│  │ Validation: 45%        │ │  │ Success: 78%           │      │
│  │ Timeout: 30%           │ │  │ Failed: 12%            │      │
│  │ Parse: 15%             │ │  │ Pending: 10%           │      │
│  │ Other: 10%             │ │  │                        │      │
│  └────────────────────────┘ │  └────────────────────────┘      │
└─────────────────────────────────────────────────────────────────┘

운영 절차

DLQ 검토 프로세스

1. 일일 검토
   - DLQ 메시지 수 확인
   - 에러 유형 분류
   - 재처리 가능 여부 판단

2. 원인 분석
   - 반복 패턴 식별
   - 근본 원인 추적
   - 시스템 개선 필요 여부

3. 처리 결정
   - 자동 재처리 설정
   - 수동 수정 후 재처리
   - 폐기 (로그 남김)

4. 사후 조치
   - 코드 수정
   - 검증 로직 추가
   - 모니터링 강화

수동 재처리 도구

#!/bin/bash
# dlq-reprocess.sh
 
DLQ_TOPIC=$1
ORIGINAL_TOPIC=$2
MAX_MESSAGES=${3:-100}
 
kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic $DLQ_TOPIC \
    --max-messages $MAX_MESSAGES \
    --from-beginning \
    | kafka-console-producer.sh \
        --bootstrap-server localhost:9092 \
        --topic $ORIGINAL_TOPIC

Best Practices

1. DLQ 정책 수립

dlq_policy:
  retry:
    max_attempts: 3
    backoff_multiplier: 2
    initial_delay_ms: 1000
 
  reprocess:
    enabled: true
    max_attempts: 3
    schedule: "0 0 * * *"  # 매일 자정
 
  retention:
    days: 30
    archive_enabled: true
 
  alerting:
    high_rate_threshold: 10/min
    backlog_threshold: 1000

2. 에러 분류 체계

public enum ErrorCategory {
    TRANSIENT,      // 일시적 - 자동 재시도
    DATA_ERROR,     // 데이터 오류 - 수정 후 재처리
    SYSTEM_ERROR,   // 시스템 오류 - 수정 배포 후 재처리
    PERMANENT       // 영구적 - 폐기 또는 수동 처리
}

3. DLQ 토픽 관리

✓ 원본 토픽당 전용 DLQ
✓ 적절한 보관 기간
✓ 정기적인 정리 작업
✓ 접근 권한 제어

4. 문서화

## DLQ 운영 가이드
 
### 에러 유형별 대응
| 에러 | 원인 | 대응 |
|------|------|------|
| ValidationException | 잘못된 데이터 | 데이터 수정 후 재처리 |
| TimeoutException | 외부 서비스 지연 | 자동 재처리 대기 |
| ParseException | 스키마 불일치 | 스키마 확인 및 수정 |
 
### 재처리 절차
1. DLQ 메시지 확인
2. 원인 분석
3. 필요시 수정
4. 재처리 실행
5. 결과 확인

관련 문서