메시지 전송 방식
Kafka Producer는 세 가지 메시지 전송 방식을 제공합니다. 각 방식은 성능, 신뢰성, 복잡성 측면에서 서로 다른 특성을 가지고 있습니다.
1. Fire-and-Forget (발사 후 망각)
개념
메시지를 전송하고 결과를 확인하지 않는 방식입니다.
특징
- 최고 성능: 응답을 기다리지 않으므로 처리량이 가장 높음
- 최저 신뢰성: 메시지 손실 가능성이 있음
- 가장 간단: 코드가 단순하고 구현이 쉬움
사용 사례
- 메시지 손실이 허용되는 경우
- 로그 수집, 메트릭 수집 등
- 높은 처리량이 중요한 경우
구현 예제
Java
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "value");
// 전송 후 결과를 확인하지 않음
producer.send(record);Python
# 전송 후 결과를 확인하지 않음
producer.send('my-topic', key=b'key', value=b'value')주의사항
- 네트워크 오류, 브로커 장애 시 메시지 손실 가능
- 직렬화 에러는 발생 가능 (전송 전 단계)
- 프로덕션 환경에서는 신중하게 사용
2. Synchronous Send (동기 전송)
개념
메시지를 전송하고 브로커의 응답을 기다리는 방식입니다.
특징
- 높은 신뢰성: 전송 성공 여부를 확인 가능
- 낮은 성능: 각 메시지마다 응답을 기다려야 함
- 에러 처리 용이: 즉시 에러를 감지하고 처리 가능
사용 사례
- 메시지 손실이 절대 허용되지 않는 경우
- 금융 거래, 주문 처리 등
- 순서가 중요한 메시지
구현 예제
Java
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "value");
try {
// get()을 호출하여 동기적으로 대기
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Message sent to partition %d with offset %d%n",
metadata.partition(), metadata.offset());
} catch (ExecutionException e) {
// 전송 실패 처리
System.err.println("Error sending message: " + e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}Java (타임아웃 설정)
try {
// 최대 10초 대기
RecordMetadata metadata = producer.send(record).get(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.err.println("Timeout waiting for response");
}Python
try:
# 동기 전송 (블로킹)
future = producer.send('my-topic', key=b'key', value=b'value')
metadata = future.get(timeout=10)
print(f"Message sent to partition {metadata.partition} "
f"with offset {metadata.offset}")
except KafkaError as e:
print(f"Error sending message: {e}")성능 영향
- 각 메시지마다 네트워크 왕복(Round-trip) 발생
- 처리량이 크게 감소 (초당 수백~수천 메시지 수준)
- 레이턴시 증가
3. Asynchronous Send (비동기 전송)
개념
메시지를 전송하고 Callback을 통해 결과를 처리하는 방식입니다.
특징
- 균형잡힌 성능: Fire-and-forget에 근접한 처리량
- 높은 신뢰성: 전송 결과를 확인 가능
- 복잡도 증가: Callback 처리 로직 필요
사용 사례
- 대부분의 프로덕션 환경
- 높은 처리량과 신뢰성이 모두 필요한 경우
- 에러를 별도로 처리하고 싶은 경우
구현 예제
Java
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "value");
// Callback을 등록하여 비동기 전송
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 전송 실패
System.err.println("Error sending message: " + exception.getMessage());
// 재시도, 로깅, 알림 등의 처리
} else {
// 전송 성공
System.out.printf("Message sent to partition %d with offset %d%n",
metadata.partition(), metadata.offset());
}
}
});Java (Lambda 사용)
producer.send(record, (metadata, exception) -> {
if (exception != null) {
handleError(exception);
} else {
handleSuccess(metadata);
}
});Python
def on_send_success(metadata):
print(f"Message sent to partition {metadata.partition} "
f"with offset {metadata.offset}")
def on_send_error(exception):
print(f"Error sending message: {exception}")
# 비동기 전송
producer.send('my-topic', key=b'key', value=b'value') \
.add_callback(on_send_success) \
.add_errback(on_send_error)Callback 실행 컨텍스트
- Callback은 Producer의 I/O 스레드에서 실행됨
- 중요: Callback에서 블로킹 작업을 하면 안 됨
- 빠른 처리가 필요 (로깅, 메트릭 기록 정도)
- 무거운 작업은 별도 스레드풀로 위임
좋은 예
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 빠른 로깅
logger.error("Failed to send message", exception);
// 메트릭 기록
metrics.incrementFailureCount();
}
});나쁜 예
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// ❌ 블로킹 작업 - Producer I/O 스레드를 차단함
sendEmailAlert(exception);
saveToDatabase(exception);
}
});전송 방식 비교
| 특성 | Fire-and-Forget | Synchronous | Asynchronous |
|---|---|---|---|
| 처리량 | 최고 | 최저 | 높음 |
| 신뢰성 | 최저 | 최고 | 높음 |
| 복잡도 | 낮음 | 중간 | 높음 |
| 응답 시간 | 즉시 | 느림 | 즉시 |
| 에러 처리 | 불가능 | 즉시 가능 | Callback으로 가능 |
| 사용 권장도 | 제한적 | 제한적 | 권장 |
전송 방식 선택 가이드
Fire-and-Forget 선택 시
✓ 메시지 손실이 허용되는 경우
✓ 최대 처리량이 필요한 경우
✓ 로그, 메트릭 등 비중요 데이터
Synchronous 선택 시
✓ 메시지 손실이 절대 불가한 경우
✓ 처리량보다 신뢰성이 중요한 경우
✓ 순서 보장이 매우 중요한 경우
✓ 간단한 구현이 필요한 경우
Asynchronous 선택 시 (권장)
✓ 대부분의 프로덕션 환경
✓ 높은 처리량과 신뢰성 모두 필요
✓ 에러를 적절히 처리할 수 있는 경우
재시도 메커니즘
모든 전송 방식에서 Producer는 자동 재시도를 수행합니다:
props.put("retries", 3); // 최대 3번 재시도
props.put("retry.backoff.ms", 100); // 재시도 간격 100ms재시도가 발생하는 경우
- 일시적인 네트워크 오류
- 브로커가 일시적으로 사용 불가
NOT_LEADER_FOR_PARTITION에러 (리더 선출 중)
재시도하지 않는 경우
- 직렬화 에러
- 메시지 크기 초과
- 잘못된 토픽 이름
Best Practices
1. 기본적으로 Asynchronous 사용
// 대부분의 경우 이 방식을 사용
producer.send(record, (metadata, exception) -> {
if (exception != null) {
handleError(record, exception);
}
});2. Callback에서 블로킹 작업 금지
// ✓ 좋은 예: 빠른 처리
producer.send(record, (metadata, exception) -> {
logger.info("Sent: {}", metadata);
});
// ✗ 나쁜 예: 블로킹 작업
producer.send(record, (metadata, exception) -> {
database.save(metadata); // 블로킹!
});3. 중요 메시지는 재시도 설정 강화
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 1); // 순서 보장
props.put("enable.idempotence", true); // 중복 방지4. 에러 처리 전략 수립
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (isRetriableError(exception)) {
// 재시도 큐에 추가
retryQueue.add(record);
} else {
// DLQ(Dead Letter Queue)로 전송
dlqProducer.send(createDLQRecord(record, exception));
}
}
});
댓글 (0)