Error Handling 전략
Kafka 기반 시스템에서 발생할 수 있는 다양한 오류를 효과적으로 처리하는 전략을 살펴봅니다.
오류 유형
오류 분류
┌─────────────────────────────────────────────────────────────────┐
│ Error Categories │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Transient Errors (일시적) │ │
│ │ - 네트워크 타임아웃 │ │
│ │ - 브로커 일시 불가 │ │
│ │ - 리소스 부족 (일시적) │ │
│ │ → 재시도로 해결 가능 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Recoverable Errors (복구 가능) │ │
│ │ - 외부 서비스 장애 │ │
│ │ - 데이터베이스 연결 실패 │ │
│ │ - Rate limiting │ │
│ │ → 백오프 + 재시도로 해결 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Non-Recoverable Errors (복구 불가) │ │
│ │ - 데이터 형식 오류 │ │
│ │ - 비즈니스 규칙 위반 │ │
│ │ - 스키마 불일치 │ │
│ │ → DLQ로 분리 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
오류 소스별 분류
| 소스 | 예시 | 처리 전략 |
|---|---|---|
| Kafka | LeaderNotAvailable, NotEnoughReplicas | 재시도 |
| 직렬화 | SerializationException | DLQ |
| 비즈니스 로직 | ValidationException | DLQ 또는 수정 |
| 외부 시스템 | ServiceUnavailable, Timeout | 재시도 + 백오프 |
| 인프라 | OutOfMemory, DiskFull | 알림 + 중단 |
Producer 에러 처리
전송 실패 처리
public class ResilientProducer {
private final KafkaProducer<String, String> producer;
private final KafkaProducer<String, String> fallbackProducer; // 폴백
public void send(String topic, String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
handleProducerError(record, exception);
} else {
log.debug("Sent to partition {} offset {}",
metadata.partition(), metadata.offset());
}
});
}
private void handleProducerError(ProducerRecord<String, String> record,
Exception exception) {
if (isRetryable(exception)) {
// 재시도 큐에 추가
retryQueue.add(new RetryRecord(record, 1));
} else if (isRecoverable(exception)) {
// 폴백 Producer 사용
fallbackProducer.send(record);
} else {
// 로컬 저장 또는 알림
saveToLocalStorage(record);
alertOps(exception);
}
}
private boolean isRetryable(Exception e) {
return e instanceof TimeoutException ||
e instanceof NotEnoughReplicasException ||
e instanceof LeaderNotAvailableException;
}
}Producer 설정 최적화
Properties props = new Properties();
// 재시도 설정
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("delivery.timeout.ms", 120000); // 총 전송 시간
// 멱등성 (중복 방지)
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5);
// 안정성
props.put("acks", "all");배치 전송 에러 처리
public class BatchProducer {
public void sendBatch(List<Message> messages) {
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (Message msg : messages) {
Future<RecordMetadata> future = producer.send(
new ProducerRecord<>("topic", msg.getKey(), msg.getValue())
);
futures.add(future);
}
// 결과 수집
List<Message> failed = new ArrayList<>();
for (int i = 0; i < futures.size(); i++) {
try {
futures.get(i).get(10, TimeUnit.SECONDS);
} catch (Exception e) {
failed.add(messages.get(i));
}
}
// 실패 메시지 처리
if (!failed.isEmpty()) {
handleFailedBatch(failed);
}
}
}Consumer 에러 처리
처리 실패 전략
public class ResilientConsumer {
private final KafkaConsumer<String, String> consumer;
private final ErrorHandler errorHandler;
private static final int MAX_RETRIES = 3;
public void consume() {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processWithRetry(record);
} catch (Exception e) {
errorHandler.handle(record, e);
}
}
consumer.commitSync();
}
}
private void processWithRetry(ConsumerRecord<String, String> record) throws Exception {
int attempt = 0;
Exception lastException = null;
while (attempt < MAX_RETRIES) {
try {
process(record);
return; // 성공
} catch (RetryableException e) {
lastException = e;
attempt++;
backoff(attempt);
} catch (NonRetryableException e) {
throw e; // 즉시 실패
}
}
throw new MaxRetriesExceededException(lastException);
}
private void backoff(int attempt) throws InterruptedException {
long delay = (long) Math.pow(2, attempt) * 100; // 지수 백오프
long jitter = (long) (Math.random() * 100); // 지터 추가
Thread.sleep(delay + jitter);
}
}ErrorHandler 구현
public interface ErrorHandler {
void handle(ConsumerRecord<?, ?> record, Exception exception);
}
public class CompositeErrorHandler implements ErrorHandler {
private final KafkaProducer<String, String> dlqProducer;
private final AlertService alertService;
private final MetricsService metrics;
@Override
public void handle(ConsumerRecord<?, ?> record, Exception exception) {
// 1. 메트릭 기록
metrics.recordError(record.topic(), exception);
// 2. 에러 분류
ErrorCategory category = classifyError(exception);
// 3. 카테고리별 처리
switch (category) {
case DATA_ERROR:
sendToDLQ(record, exception);
break;
case SYSTEM_ERROR:
alertService.alert(AlertLevel.HIGH, exception);
sendToDLQ(record, exception);
break;
case CRITICAL:
alertService.alert(AlertLevel.CRITICAL, exception);
throw new ProcessingException(exception); // 처리 중단
default:
sendToDLQ(record, exception);
}
// 4. 로깅
log.error("Error processing record: topic={}, partition={}, offset={}, error={}",
record.topic(), record.partition(), record.offset(), exception.getMessage());
}
private ErrorCategory classifyError(Exception e) {
if (e instanceof ValidationException) return ErrorCategory.DATA_ERROR;
if (e instanceof OutOfMemoryError) return ErrorCategory.CRITICAL;
if (e instanceof ServiceUnavailableException) return ErrorCategory.SYSTEM_ERROR;
return ErrorCategory.UNKNOWN;
}
}Pause/Resume 전략
public class PauseResumeConsumer {
private final KafkaConsumer<String, String> consumer;
private final Set<TopicPartition> pausedPartitions = new HashSet<>();
public void consume() {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
try {
process(record);
} catch (TemporaryException e) {
// 파티션 일시 정지
pausePartition(partition);
scheduleResume(partition, Duration.ofMinutes(5));
break; // 다음 파티션으로
}
}
}
// 정지되지 않은 파티션만 커밋
commitNonPausedPartitions();
}
}
private void pausePartition(TopicPartition partition) {
consumer.pause(Collections.singleton(partition));
pausedPartitions.add(partition);
log.warn("Paused partition: {}", partition);
}
private void scheduleResume(TopicPartition partition, Duration delay) {
scheduler.schedule(() -> {
consumer.resume(Collections.singleton(partition));
pausedPartitions.remove(partition);
log.info("Resumed partition: {}", partition);
}, delay.toMillis(), TimeUnit.MILLISECONDS);
}
}재시도 전략
지수 백오프
public class ExponentialBackoff {
private final int maxRetries;
private final long initialDelay;
private final double multiplier;
private final long maxDelay;
public long getDelay(int attempt) {
long delay = (long) (initialDelay * Math.pow(multiplier, attempt));
return Math.min(delay, maxDelay);
}
public long getDelayWithJitter(int attempt) {
long baseDelay = getDelay(attempt);
long jitter = (long) (baseDelay * Math.random() * 0.3); // 30% 지터
return baseDelay + jitter;
}
}
// 사용
ExponentialBackoff backoff = new ExponentialBackoff(
5, // maxRetries
100, // initialDelay (ms)
2.0, // multiplier
30000 // maxDelay (ms)
);
// 재시도: 100ms → 200ms → 400ms → 800ms → 1600ms (최대 30초)Circuit Breaker
public class CircuitBreaker {
private final int failureThreshold;
private final long resetTimeout;
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
private volatile long lastFailureTime;
public enum State { CLOSED, OPEN, HALF_OPEN }
public <T> T execute(Supplier<T> operation) throws CircuitBreakerException {
if (state.get() == State.OPEN) {
if (System.currentTimeMillis() - lastFailureTime > resetTimeout) {
state.set(State.HALF_OPEN);
} else {
throw new CircuitBreakerException("Circuit is OPEN");
}
}
try {
T result = operation.get();
reset();
return result;
} catch (Exception e) {
recordFailure();
throw e;
}
}
private void recordFailure() {
int failures = failureCount.incrementAndGet();
lastFailureTime = System.currentTimeMillis();
if (failures >= failureThreshold) {
state.set(State.OPEN);
log.warn("Circuit breaker opened after {} failures", failures);
}
}
private void reset() {
failureCount.set(0);
state.set(State.CLOSED);
}
}Resilience4j 통합
// Resilience4j 설정
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(30))
.slidingWindowSize(10)
.build();
RetryConfig retryConfig = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.ofMillis(500))
.retryExceptions(TimeoutException.class, IOException.class)
.build();
// Consumer에 적용
public void processWithResilience(ConsumerRecord<String, String> record) {
Retry retry = Retry.of("kafka-consumer", retryConfig);
CircuitBreaker circuitBreaker = CircuitBreaker.of("external-service", config);
Supplier<Void> decoratedSupplier = Decorators.ofSupplier(() -> {
externalService.call(record.value());
return null;
})
.withRetry(retry)
.withCircuitBreaker(circuitBreaker)
.decorate();
try {
decoratedSupplier.get();
} catch (Exception e) {
sendToDLQ(record, e);
}
}Commit 전략과 에러
At-Least-Once 보장
public void consumeAtLeastOnce() {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// 1. 처리
process(record);
// 2. 처리 성공 후 커밋
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
));
} catch (Exception e) {
// 커밋하지 않음 → 재처리됨
handleError(record, e);
break; // 순서 보장을 위해 중단
}
}
}
}Exactly-Once (트랜잭션)
public void consumeExactlyOnce() {
producer.initTransactions();
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
try {
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
// 처리 결과를 다른 토픽에 전송
ProducerRecord<String, String> output = process(record);
producer.send(output);
}
// Consumer offset을 트랜잭션에 포함
producer.sendOffsetsToTransaction(
getOffsets(records),
consumer.groupMetadata()
);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
// 다음 poll()에서 재처리됨
}
}
}모니터링과 알림
에러 메트릭
public class ErrorMetrics {
private final MeterRegistry registry;
public void recordProcessingError(String topic, String errorType) {
registry.counter("kafka.consumer.errors",
"topic", topic,
"error_type", errorType
).increment();
}
public void recordRetry(String topic, int attempt) {
registry.counter("kafka.consumer.retries",
"topic", topic,
"attempt", String.valueOf(attempt)
).increment();
}
public void recordDLQ(String topic) {
registry.counter("kafka.dlq.messages",
"original_topic", topic
).increment();
}
}알림 규칙
alerts:
- name: HighErrorRate
condition: rate(kafka_consumer_errors_total[5m]) > 10
severity: warning
action: "Check consumer logs and external dependencies"
- name: RetryStorm
condition: rate(kafka_consumer_retries_total[1m]) > 100
severity: high
action: "Possible external service issue, check dependencies"
- name: DLQSpike
condition: increase(kafka_dlq_messages_total[1h]) > 50
severity: warning
action: "Review DLQ messages for patterns"
- name: CircuitBreakerOpen
condition: circuit_breaker_state == 2 # OPEN
severity: high
action: "External service degraded, review dependency"Best Practices
1. 에러 분류 체계
public enum ErrorSeverity {
LOW, // 로그만
MEDIUM, // DLQ + 메트릭
HIGH, // DLQ + 알림
CRITICAL // 처리 중단 + 즉시 알림
}
public ErrorSeverity classify(Exception e) {
if (e instanceof ValidationException) return ErrorSeverity.LOW;
if (e instanceof ServiceUnavailableException) return ErrorSeverity.MEDIUM;
if (e instanceof DataCorruptionException) return ErrorSeverity.HIGH;
if (e instanceof OutOfMemoryError) return ErrorSeverity.CRITICAL;
return ErrorSeverity.MEDIUM;
}2. 방어적 프로그래밍
public void process(ConsumerRecord<String, String> record) {
// Null 체크
if (record.value() == null) {
log.warn("Null value received, skipping");
return;
}
// 파싱 검증
Message message;
try {
message = objectMapper.readValue(record.value(), Message.class);
} catch (JsonProcessingException e) {
sendToDLQ(record, e);
return;
}
// 비즈니스 검증
ValidationResult validation = validator.validate(message);
if (!validation.isValid()) {
log.warn("Validation failed: {}", validation.getErrors());
sendToDLQ(record, new ValidationException(validation));
return;
}
// 처리
businessLogic.process(message);
}3. 테스트 전략
@Test
void shouldHandleTimeoutException() {
// Given
when(externalService.call(any())).thenThrow(new TimeoutException());
// When
consumer.process(testRecord);
// Then
verify(retryQueue).add(any()); // 재시도 큐에 추가됨
}
@Test
void shouldSendToDLQOnNonRetryableError() {
// Given
when(parser.parse(any())).thenThrow(new ParseException());
// When
consumer.process(testRecord);
// Then
verify(dlqProducer).send(any()); // DLQ로 전송됨
}
@Test
void shouldOpenCircuitBreakerAfterThreshold() {
// Given
IntStream.range(0, 10).forEach(i ->
when(service.call(any())).thenThrow(new ServiceException())
);
// When
IntStream.range(0, 10).forEach(i -> consumer.process(testRecord));
// Then
assertThat(circuitBreaker.getState()).isEqualTo(State.OPEN);
}4. 문서화
## Error Handling Policy
### Retry Policy
- Max retries: 3
- Backoff: Exponential (100ms, 200ms, 400ms)
- Retryable: TimeoutException, ConnectionException
### DLQ Policy
- Topic naming: {original-topic}-dlq
- Retention: 30 days
- Monitoring: Daily review
### Alert Policy
| Error Type | Severity | Response |
|------------|----------|----------|
| Transient | Low | Auto-retry |
| Data Error | Medium | DLQ + Log |
| System | High | Alert + DLQ |
| Critical | Critical | Page + Stop |
댓글 (0)