메시지 전송 방식

Kafka Producer는 세 가지 메시지 전송 방식을 제공합니다. 각 방식은 성능, 신뢰성, 복잡성 측면에서 서로 다른 특성을 가지고 있습니다.

1. Fire-and-Forget (발사 후 망각)

개념

메시지를 전송하고 결과를 확인하지 않는 방식입니다.

특징

  • 최고 성능: 응답을 기다리지 않으므로 처리량이 가장 높음
  • 최저 신뢰성: 메시지 손실 가능성이 있음
  • 가장 간단: 코드가 단순하고 구현이 쉬움

사용 사례

  • 메시지 손실이 허용되는 경우
  • 로그 수집, 메트릭 수집 등
  • 높은 처리량이 중요한 경우

구현 예제

Java

ProducerRecord<String, String> record =
    new ProducerRecord<>("my-topic", "key", "value");
 
// 전송 후 결과를 확인하지 않음
producer.send(record);

Python

# 전송 후 결과를 확인하지 않음
producer.send('my-topic', key=b'key', value=b'value')

주의사항

  • 네트워크 오류, 브로커 장애 시 메시지 손실 가능
  • 직렬화 에러는 발생 가능 (전송 전 단계)
  • 프로덕션 환경에서는 신중하게 사용

2. Synchronous Send (동기 전송)

개념

메시지를 전송하고 브로커의 응답을 기다리는 방식입니다.

특징

  • 높은 신뢰성: 전송 성공 여부를 확인 가능
  • 낮은 성능: 각 메시지마다 응답을 기다려야 함
  • 에러 처리 용이: 즉시 에러를 감지하고 처리 가능

사용 사례

  • 메시지 손실이 절대 허용되지 않는 경우
  • 금융 거래, 주문 처리 등
  • 순서가 중요한 메시지

구현 예제

Java

ProducerRecord<String, String> record =
    new ProducerRecord<>("my-topic", "key", "value");
 
try {
    // get()을 호출하여 동기적으로 대기
    RecordMetadata metadata = producer.send(record).get();
 
    System.out.printf("Message sent to partition %d with offset %d%n",
        metadata.partition(), metadata.offset());
 
} catch (ExecutionException e) {
    // 전송 실패 처리
    System.err.println("Error sending message: " + e.getMessage());
 
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

Java (타임아웃 설정)

try {
    // 최대 10초 대기
    RecordMetadata metadata = producer.send(record).get(10, TimeUnit.SECONDS);
 
} catch (TimeoutException e) {
    System.err.println("Timeout waiting for response");
}

Python

try:
    # 동기 전송 (블로킹)
    future = producer.send('my-topic', key=b'key', value=b'value')
    metadata = future.get(timeout=10)
 
    print(f"Message sent to partition {metadata.partition} "
          f"with offset {metadata.offset}")
 
except KafkaError as e:
    print(f"Error sending message: {e}")

성능 영향

  • 각 메시지마다 네트워크 왕복(Round-trip) 발생
  • 처리량이 크게 감소 (초당 수백~수천 메시지 수준)
  • 레이턴시 증가

3. Asynchronous Send (비동기 전송)

개념

메시지를 전송하고 Callback을 통해 결과를 처리하는 방식입니다.

특징

  • 균형잡힌 성능: Fire-and-forget에 근접한 처리량
  • 높은 신뢰성: 전송 결과를 확인 가능
  • 복잡도 증가: Callback 처리 로직 필요

사용 사례

  • 대부분의 프로덕션 환경
  • 높은 처리량과 신뢰성이 모두 필요한 경우
  • 에러를 별도로 처리하고 싶은 경우

구현 예제

Java

ProducerRecord<String, String> record =
    new ProducerRecord<>("my-topic", "key", "value");
 
// Callback을 등록하여 비동기 전송
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            // 전송 실패
            System.err.println("Error sending message: " + exception.getMessage());
            // 재시도, 로깅, 알림 등의 처리
        } else {
            // 전송 성공
            System.out.printf("Message sent to partition %d with offset %d%n",
                metadata.partition(), metadata.offset());
        }
    }
});

Java (Lambda 사용)

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        handleError(exception);
    } else {
        handleSuccess(metadata);
    }
});

Python

def on_send_success(metadata):
    print(f"Message sent to partition {metadata.partition} "
          f"with offset {metadata.offset}")
 
def on_send_error(exception):
    print(f"Error sending message: {exception}")
 
# 비동기 전송
producer.send('my-topic', key=b'key', value=b'value') \
    .add_callback(on_send_success) \
    .add_errback(on_send_error)

Callback 실행 컨텍스트

  • Callback은 Producer의 I/O 스레드에서 실행됨
  • 중요: Callback에서 블로킹 작업을 하면 안 됨
  • 빠른 처리가 필요 (로깅, 메트릭 기록 정도)
  • 무거운 작업은 별도 스레드풀로 위임

좋은 예

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // 빠른 로깅
        logger.error("Failed to send message", exception);
        // 메트릭 기록
        metrics.incrementFailureCount();
    }
});

나쁜 예

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // ❌ 블로킹 작업 - Producer I/O 스레드를 차단함
        sendEmailAlert(exception);
        saveToDatabase(exception);
    }
});

전송 방식 비교

특성Fire-and-ForgetSynchronousAsynchronous
처리량최고최저높음
신뢰성최저최고높음
복잡도낮음중간높음
응답 시간즉시느림즉시
에러 처리불가능즉시 가능Callback으로 가능
사용 권장도제한적제한적권장

전송 방식 선택 가이드

Fire-and-Forget 선택 시

✓ 메시지 손실이 허용되는 경우
✓ 최대 처리량이 필요한 경우
✓ 로그, 메트릭 등 비중요 데이터

Synchronous 선택 시

✓ 메시지 손실이 절대 불가한 경우
✓ 처리량보다 신뢰성이 중요한 경우
✓ 순서 보장이 매우 중요한 경우
✓ 간단한 구현이 필요한 경우

Asynchronous 선택 시 (권장)

✓ 대부분의 프로덕션 환경
✓ 높은 처리량과 신뢰성 모두 필요
✓ 에러를 적절히 처리할 수 있는 경우

재시도 메커니즘

모든 전송 방식에서 Producer는 자동 재시도를 수행합니다:

props.put("retries", 3);  // 최대 3번 재시도
props.put("retry.backoff.ms", 100);  // 재시도 간격 100ms

재시도가 발생하는 경우

  • 일시적인 네트워크 오류
  • 브로커가 일시적으로 사용 불가
  • NOT_LEADER_FOR_PARTITION 에러 (리더 선출 중)

재시도하지 않는 경우

  • 직렬화 에러
  • 메시지 크기 초과
  • 잘못된 토픽 이름

Best Practices

1. 기본적으로 Asynchronous 사용

// 대부분의 경우 이 방식을 사용
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        handleError(record, exception);
    }
});

2. Callback에서 블로킹 작업 금지

// ✓ 좋은 예: 빠른 처리
producer.send(record, (metadata, exception) -> {
    logger.info("Sent: {}", metadata);
});
 
// ✗ 나쁜 예: 블로킹 작업
producer.send(record, (metadata, exception) -> {
    database.save(metadata);  // 블로킹!
});

3. 중요 메시지는 재시도 설정 강화

props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 1);  // 순서 보장
props.put("enable.idempotence", true);  // 중복 방지

4. 에러 처리 전략 수립

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        if (isRetriableError(exception)) {
            // 재시도 큐에 추가
            retryQueue.add(record);
        } else {
            // DLQ(Dead Letter Queue)로 전송
            dlqProducer.send(createDLQRecord(record, exception));
        }
    }
});

관련 문서