Error Handling 전략

Kafka 기반 시스템에서 발생할 수 있는 다양한 오류를 효과적으로 처리하는 전략을 살펴봅니다.

오류 유형

오류 분류

┌─────────────────────────────────────────────────────────────────┐
│                      Error Categories                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │              Transient Errors (일시적)                   │   │
│  │  - 네트워크 타임아웃                                     │   │
│  │  - 브로커 일시 불가                                      │   │
│  │  - 리소스 부족 (일시적)                                  │   │
│  │  → 재시도로 해결 가능                                    │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │              Recoverable Errors (복구 가능)              │   │
│  │  - 외부 서비스 장애                                      │   │
│  │  - 데이터베이스 연결 실패                                │   │
│  │  - Rate limiting                                        │   │
│  │  → 백오프 + 재시도로 해결                                │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │              Non-Recoverable Errors (복구 불가)          │   │
│  │  - 데이터 형식 오류                                      │   │
│  │  - 비즈니스 규칙 위반                                    │   │
│  │  - 스키마 불일치                                         │   │
│  │  → DLQ로 분리                                           │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

오류 소스별 분류

소스예시처리 전략
KafkaLeaderNotAvailable, NotEnoughReplicas재시도
직렬화SerializationExceptionDLQ
비즈니스 로직ValidationExceptionDLQ 또는 수정
외부 시스템ServiceUnavailable, Timeout재시도 + 백오프
인프라OutOfMemory, DiskFull알림 + 중단

Producer 에러 처리

전송 실패 처리

public class ResilientProducer {
    private final KafkaProducer<String, String> producer;
    private final KafkaProducer<String, String> fallbackProducer;  // 폴백
 
    public void send(String topic, String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
 
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                handleProducerError(record, exception);
            } else {
                log.debug("Sent to partition {} offset {}",
                    metadata.partition(), metadata.offset());
            }
        });
    }
 
    private void handleProducerError(ProducerRecord<String, String> record,
                                     Exception exception) {
        if (isRetryable(exception)) {
            // 재시도 큐에 추가
            retryQueue.add(new RetryRecord(record, 1));
        } else if (isRecoverable(exception)) {
            // 폴백 Producer 사용
            fallbackProducer.send(record);
        } else {
            // 로컬 저장 또는 알림
            saveToLocalStorage(record);
            alertOps(exception);
        }
    }
 
    private boolean isRetryable(Exception e) {
        return e instanceof TimeoutException ||
               e instanceof NotEnoughReplicasException ||
               e instanceof LeaderNotAvailableException;
    }
}

Producer 설정 최적화

Properties props = new Properties();
 
// 재시도 설정
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("delivery.timeout.ms", 120000);  // 총 전송 시간
 
// 멱등성 (중복 방지)
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5);
 
// 안정성
props.put("acks", "all");

배치 전송 에러 처리

public class BatchProducer {
    public void sendBatch(List<Message> messages) {
        List<Future<RecordMetadata>> futures = new ArrayList<>();
 
        for (Message msg : messages) {
            Future<RecordMetadata> future = producer.send(
                new ProducerRecord<>("topic", msg.getKey(), msg.getValue())
            );
            futures.add(future);
        }
 
        // 결과 수집
        List<Message> failed = new ArrayList<>();
        for (int i = 0; i < futures.size(); i++) {
            try {
                futures.get(i).get(10, TimeUnit.SECONDS);
            } catch (Exception e) {
                failed.add(messages.get(i));
            }
        }
 
        // 실패 메시지 처리
        if (!failed.isEmpty()) {
            handleFailedBatch(failed);
        }
    }
}

Consumer 에러 처리

처리 실패 전략

public class ResilientConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final ErrorHandler errorHandler;
    private static final int MAX_RETRIES = 3;
 
    public void consume() {
        while (running) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
            for (ConsumerRecord<String, String> record : records) {
                try {
                    processWithRetry(record);
                } catch (Exception e) {
                    errorHandler.handle(record, e);
                }
            }
            consumer.commitSync();
        }
    }
 
    private void processWithRetry(ConsumerRecord<String, String> record) throws Exception {
        int attempt = 0;
        Exception lastException = null;
 
        while (attempt < MAX_RETRIES) {
            try {
                process(record);
                return;  // 성공
            } catch (RetryableException e) {
                lastException = e;
                attempt++;
                backoff(attempt);
            } catch (NonRetryableException e) {
                throw e;  // 즉시 실패
            }
        }
        throw new MaxRetriesExceededException(lastException);
    }
 
    private void backoff(int attempt) throws InterruptedException {
        long delay = (long) Math.pow(2, attempt) * 100;  // 지수 백오프
        long jitter = (long) (Math.random() * 100);      // 지터 추가
        Thread.sleep(delay + jitter);
    }
}

ErrorHandler 구현

public interface ErrorHandler {
    void handle(ConsumerRecord<?, ?> record, Exception exception);
}
 
public class CompositeErrorHandler implements ErrorHandler {
    private final KafkaProducer<String, String> dlqProducer;
    private final AlertService alertService;
    private final MetricsService metrics;
 
    @Override
    public void handle(ConsumerRecord<?, ?> record, Exception exception) {
        // 1. 메트릭 기록
        metrics.recordError(record.topic(), exception);
 
        // 2. 에러 분류
        ErrorCategory category = classifyError(exception);
 
        // 3. 카테고리별 처리
        switch (category) {
            case DATA_ERROR:
                sendToDLQ(record, exception);
                break;
 
            case SYSTEM_ERROR:
                alertService.alert(AlertLevel.HIGH, exception);
                sendToDLQ(record, exception);
                break;
 
            case CRITICAL:
                alertService.alert(AlertLevel.CRITICAL, exception);
                throw new ProcessingException(exception);  // 처리 중단
 
            default:
                sendToDLQ(record, exception);
        }
 
        // 4. 로깅
        log.error("Error processing record: topic={}, partition={}, offset={}, error={}",
            record.topic(), record.partition(), record.offset(), exception.getMessage());
    }
 
    private ErrorCategory classifyError(Exception e) {
        if (e instanceof ValidationException) return ErrorCategory.DATA_ERROR;
        if (e instanceof OutOfMemoryError) return ErrorCategory.CRITICAL;
        if (e instanceof ServiceUnavailableException) return ErrorCategory.SYSTEM_ERROR;
        return ErrorCategory.UNKNOWN;
    }
}

Pause/Resume 전략

public class PauseResumeConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final Set<TopicPartition> pausedPartitions = new HashSet<>();
 
    public void consume() {
        while (running) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
 
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    try {
                        process(record);
                    } catch (TemporaryException e) {
                        // 파티션 일시 정지
                        pausePartition(partition);
                        scheduleResume(partition, Duration.ofMinutes(5));
                        break;  // 다음 파티션으로
                    }
                }
            }
 
            // 정지되지 않은 파티션만 커밋
            commitNonPausedPartitions();
        }
    }
 
    private void pausePartition(TopicPartition partition) {
        consumer.pause(Collections.singleton(partition));
        pausedPartitions.add(partition);
        log.warn("Paused partition: {}", partition);
    }
 
    private void scheduleResume(TopicPartition partition, Duration delay) {
        scheduler.schedule(() -> {
            consumer.resume(Collections.singleton(partition));
            pausedPartitions.remove(partition);
            log.info("Resumed partition: {}", partition);
        }, delay.toMillis(), TimeUnit.MILLISECONDS);
    }
}

재시도 전략

지수 백오프

public class ExponentialBackoff {
    private final int maxRetries;
    private final long initialDelay;
    private final double multiplier;
    private final long maxDelay;
 
    public long getDelay(int attempt) {
        long delay = (long) (initialDelay * Math.pow(multiplier, attempt));
        return Math.min(delay, maxDelay);
    }
 
    public long getDelayWithJitter(int attempt) {
        long baseDelay = getDelay(attempt);
        long jitter = (long) (baseDelay * Math.random() * 0.3);  // 30% 지터
        return baseDelay + jitter;
    }
}
 
// 사용
ExponentialBackoff backoff = new ExponentialBackoff(
    5,       // maxRetries
    100,     // initialDelay (ms)
    2.0,     // multiplier
    30000    // maxDelay (ms)
);
 
// 재시도: 100ms → 200ms → 400ms → 800ms → 1600ms (최대 30초)

Circuit Breaker

public class CircuitBreaker {
    private final int failureThreshold;
    private final long resetTimeout;
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
    private volatile long lastFailureTime;
 
    public enum State { CLOSED, OPEN, HALF_OPEN }
 
    public <T> T execute(Supplier<T> operation) throws CircuitBreakerException {
        if (state.get() == State.OPEN) {
            if (System.currentTimeMillis() - lastFailureTime > resetTimeout) {
                state.set(State.HALF_OPEN);
            } else {
                throw new CircuitBreakerException("Circuit is OPEN");
            }
        }
 
        try {
            T result = operation.get();
            reset();
            return result;
        } catch (Exception e) {
            recordFailure();
            throw e;
        }
    }
 
    private void recordFailure() {
        int failures = failureCount.incrementAndGet();
        lastFailureTime = System.currentTimeMillis();
 
        if (failures >= failureThreshold) {
            state.set(State.OPEN);
            log.warn("Circuit breaker opened after {} failures", failures);
        }
    }
 
    private void reset() {
        failureCount.set(0);
        state.set(State.CLOSED);
    }
}

Resilience4j 통합

// Resilience4j 설정
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
    .failureRateThreshold(50)
    .waitDurationInOpenState(Duration.ofSeconds(30))
    .slidingWindowSize(10)
    .build();
 
RetryConfig retryConfig = RetryConfig.custom()
    .maxAttempts(3)
    .waitDuration(Duration.ofMillis(500))
    .retryExceptions(TimeoutException.class, IOException.class)
    .build();
 
// Consumer에 적용
public void processWithResilience(ConsumerRecord<String, String> record) {
    Retry retry = Retry.of("kafka-consumer", retryConfig);
    CircuitBreaker circuitBreaker = CircuitBreaker.of("external-service", config);
 
    Supplier<Void> decoratedSupplier = Decorators.ofSupplier(() -> {
        externalService.call(record.value());
        return null;
    })
    .withRetry(retry)
    .withCircuitBreaker(circuitBreaker)
    .decorate();
 
    try {
        decoratedSupplier.get();
    } catch (Exception e) {
        sendToDLQ(record, e);
    }
}

Commit 전략과 에러

At-Least-Once 보장

public void consumeAtLeastOnce() {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
        for (ConsumerRecord<String, String> record : records) {
            try {
                // 1. 처리
                process(record);
 
                // 2. 처리 성공 후 커밋
                consumer.commitSync(Collections.singletonMap(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1)
                ));
            } catch (Exception e) {
                // 커밋하지 않음 → 재처리됨
                handleError(record, e);
                break;  // 순서 보장을 위해 중단
            }
        }
    }
}

Exactly-Once (트랜잭션)

public void consumeExactlyOnce() {
    producer.initTransactions();
 
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
        try {
            producer.beginTransaction();
 
            for (ConsumerRecord<String, String> record : records) {
                // 처리 결과를 다른 토픽에 전송
                ProducerRecord<String, String> output = process(record);
                producer.send(output);
            }
 
            // Consumer offset을 트랜잭션에 포함
            producer.sendOffsetsToTransaction(
                getOffsets(records),
                consumer.groupMetadata()
            );
 
            producer.commitTransaction();
        } catch (Exception e) {
            producer.abortTransaction();
            // 다음 poll()에서 재처리됨
        }
    }
}

모니터링과 알림

에러 메트릭

public class ErrorMetrics {
    private final MeterRegistry registry;
 
    public void recordProcessingError(String topic, String errorType) {
        registry.counter("kafka.consumer.errors",
            "topic", topic,
            "error_type", errorType
        ).increment();
    }
 
    public void recordRetry(String topic, int attempt) {
        registry.counter("kafka.consumer.retries",
            "topic", topic,
            "attempt", String.valueOf(attempt)
        ).increment();
    }
 
    public void recordDLQ(String topic) {
        registry.counter("kafka.dlq.messages",
            "original_topic", topic
        ).increment();
    }
}

알림 규칙

alerts:
  - name: HighErrorRate
    condition: rate(kafka_consumer_errors_total[5m]) > 10
    severity: warning
    action: "Check consumer logs and external dependencies"
 
  - name: RetryStorm
    condition: rate(kafka_consumer_retries_total[1m]) > 100
    severity: high
    action: "Possible external service issue, check dependencies"
 
  - name: DLQSpike
    condition: increase(kafka_dlq_messages_total[1h]) > 50
    severity: warning
    action: "Review DLQ messages for patterns"
 
  - name: CircuitBreakerOpen
    condition: circuit_breaker_state == 2  # OPEN
    severity: high
    action: "External service degraded, review dependency"

Best Practices

1. 에러 분류 체계

public enum ErrorSeverity {
    LOW,       // 로그만
    MEDIUM,    // DLQ + 메트릭
    HIGH,      // DLQ + 알림
    CRITICAL   // 처리 중단 + 즉시 알림
}
 
public ErrorSeverity classify(Exception e) {
    if (e instanceof ValidationException) return ErrorSeverity.LOW;
    if (e instanceof ServiceUnavailableException) return ErrorSeverity.MEDIUM;
    if (e instanceof DataCorruptionException) return ErrorSeverity.HIGH;
    if (e instanceof OutOfMemoryError) return ErrorSeverity.CRITICAL;
    return ErrorSeverity.MEDIUM;
}

2. 방어적 프로그래밍

public void process(ConsumerRecord<String, String> record) {
    // Null 체크
    if (record.value() == null) {
        log.warn("Null value received, skipping");
        return;
    }
 
    // 파싱 검증
    Message message;
    try {
        message = objectMapper.readValue(record.value(), Message.class);
    } catch (JsonProcessingException e) {
        sendToDLQ(record, e);
        return;
    }
 
    // 비즈니스 검증
    ValidationResult validation = validator.validate(message);
    if (!validation.isValid()) {
        log.warn("Validation failed: {}", validation.getErrors());
        sendToDLQ(record, new ValidationException(validation));
        return;
    }
 
    // 처리
    businessLogic.process(message);
}

3. 테스트 전략

@Test
void shouldHandleTimeoutException() {
    // Given
    when(externalService.call(any())).thenThrow(new TimeoutException());
 
    // When
    consumer.process(testRecord);
 
    // Then
    verify(retryQueue).add(any());  // 재시도 큐에 추가됨
}
 
@Test
void shouldSendToDLQOnNonRetryableError() {
    // Given
    when(parser.parse(any())).thenThrow(new ParseException());
 
    // When
    consumer.process(testRecord);
 
    // Then
    verify(dlqProducer).send(any());  // DLQ로 전송됨
}
 
@Test
void shouldOpenCircuitBreakerAfterThreshold() {
    // Given
    IntStream.range(0, 10).forEach(i ->
        when(service.call(any())).thenThrow(new ServiceException())
    );
 
    // When
    IntStream.range(0, 10).forEach(i -> consumer.process(testRecord));
 
    // Then
    assertThat(circuitBreaker.getState()).isEqualTo(State.OPEN);
}

4. 문서화

## Error Handling Policy
 
### Retry Policy
- Max retries: 3
- Backoff: Exponential (100ms, 200ms, 400ms)
- Retryable: TimeoutException, ConnectionException
 
### DLQ Policy
- Topic naming: {original-topic}-dlq
- Retention: 30 days
- Monitoring: Daily review
 
### Alert Policy
| Error Type | Severity | Response |
|------------|----------|----------|
| Transient  | Low      | Auto-retry |
| Data Error | Medium   | DLQ + Log |
| System     | High     | Alert + DLQ |
| Critical   | Critical | Page + Stop |

관련 문서