Partitioner와 메시지 분배 전략
Partitioner는 Producer가 메시지를 어느 파티션으로 전송할지 결정하는 컴포넌트입니다. 올바른 파티셔닝 전략은 데이터 분산, 처리 순서, 성능에 큰 영향을 미칩니다.
Partitioner의 역할
ProducerRecord (Topic, Key, Value)
↓
Partitioner
↓
Partition 0, 1, 2, ... N
Partitioner는 다음을 결정합니다:
- 메시지가 저장될 파티션 번호
- 같은 키를 가진 메시지의 순서 보장
- 파티션 간 로드 밸런싱
기본 파티셔닝 전략
1. 키 기반 파티셔닝 (Key-based)
메시지에 키가 있는 경우, 키의 해시값을 기반으로 파티션을 결정합니다.
동작 방식
partition = hash(key) % number_of_partitions
특징
- 순서 보장: 같은 키를 가진 메시지는 항상 같은 파티션으로 전송
- 일관성: 키가 동일하면 항상 동일한 파티션에 할당
- 불균형 가능: 특정 키가 많으면 파티션 불균형 발생 가능
예제
// 키를 지정하여 전송
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", "user-123", orderData);
// user-123 키를 가진 모든 메시지는 같은 파티션으로 전송됨사용 사례
- 사용자별 이벤트 순서 보장 (userId를 키로 사용)
- 세션별 로그 그룹핑 (sessionId를 키로 사용)
- 디바이스별 데이터 정렬 (deviceId를 키로 사용)
2. 라운드 로빈 (Round-robin)
메시지에 키가 없는 경우, 모든 파티션에 순차적으로 분산합니다.
동작 방식 (Kafka 2.4+)
- Sticky Partitioning: 배치가 가득 차거나 linger.ms 타임아웃까지 같은 파티션 사용
- 이전 버전: 메시지마다 파티션 변경 (비효율적)
특징
- 균등 분산: 모든 파티션에 균등하게 메시지 분배
- 높은 처리량: Sticky Partitioning으로 배치 효율 향상
- 순서 없음: 전체 순서는 보장되지 않음
예제
// 키 없이 전송 (null 키)
ProducerRecord<String, String> record =
new ProducerRecord<>("logs", null, logData);
// 파티션이 자동으로 선택됨 (라운드 로빈)사용 사례
- 순서가 중요하지 않은 로그
- 메트릭 데이터
- 단순 데이터 분산
3. 명시적 파티션 지정
파티션 번호를 직접 지정할 수 있습니다.
예제
// 파티션 2로 직접 전송
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", 2, "key", "value");주의사항
- 파티션 번호를 잘못 지정하면 에러 발생
- 파티션 수가 변경되면 코드 수정 필요
- 일반적으로 권장하지 않음
커스텀 Partitioner 구현
기본 파티셔닝 전략이 적합하지 않은 경우 커스텀 파티셔너를 구현할 수 있습니다.
Partitioner 인터페이스
public interface Partitioner extends Configurable, Closeable {
/**
* 파티션 번호를 계산
*/
int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster);
/**
* Partitioner 종료 시 호출
*/
void close();
/**
* 설정 초기화 시 호출
*/
void configure(Map<String, ?> configs);
}예제 1: VIP 고객 전용 파티션
public class VIPPartitioner implements Partitioner {
private static final int VIP_PARTITION = 0;
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
// VIP 고객은 파티션 0으로
if (key != null && ((String) key).startsWith("VIP-")) {
return VIP_PARTITION;
}
// 일반 고객은 나머지 파티션에 해시 분산
return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1) + 1;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}사용 예제:
props.put("partitioner.class", "com.example.VIPPartitioner");예제 2: 지역 기반 파티셔닝
public class RegionPartitioner implements Partitioner {
private Map<String, Integer> regionToPartition = new HashMap<>();
@Override
public void configure(Map<String, ?> configs) {
// 설정에서 지역-파티션 매핑 로드
regionToPartition.put("US", 0);
regionToPartition.put("EU", 1);
regionToPartition.put("ASIA", 2);
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
if (key != null) {
String region = extractRegion((String) key);
Integer partition = regionToPartition.get(region);
if (partition != null) {
return partition % numPartitions;
}
}
// 기본값: 라운드 로빈
return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
}
private String extractRegion(String key) {
// key 형식: "US:user123" -> "US" 추출
return key.split(":")[0];
}
@Override
public void close() {}
}예제 3: 시간 기반 파티셔닝
public class TimeBasedPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
// 시간대별로 파티션 분배 (0-5시: P0, 6-11시: P1, ...)
int hour = LocalDateTime.now().getHour();
int partitionByHour = hour / (24 / numPartitions);
return partitionByHour % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}파티셔닝 전략 선택 가이드
순서가 중요한 경우
// 키를 사용하여 순서 보장
ProducerRecord<String, Order> record =
new ProducerRecord<>("orders", order.getUserId(), order);
// 같은 userId의 주문은 순서대로 처리됨로드 밸런싱이 중요한 경우
// 키 없이 전송하여 균등 분산
ProducerRecord<String, Log> record =
new ProducerRecord<>("logs", null, logData);
// 또는 랜덤 키 사용
String randomKey = UUID.randomUUID().toString();
ProducerRecord<String, Log> record =
new ProducerRecord<>("logs", randomKey, logData);특정 그룹핑이 필요한 경우
// 커스텀 파티셔너 사용
props.put("partitioner.class", "com.example.RegionPartitioner");
// 지역 정보를 키에 포함
ProducerRecord<String, Event> record =
new ProducerRecord<>("events", "US:event123", eventData);파티션 수 변경의 영향
파티션 추가 시 주의사항
파티션 수가 변경되면 키-파티션 매핑이 변경됩니다:
# 원래: 3개 파티션
key "user-123" → hash % 3 = partition 1
# 변경 후: 5개 파티션
key "user-123" → hash % 5 = partition 3 # 매핑 변경!
영향
- 같은 키의 메시지가 다른 파티션으로 분산될 수 있음
- 순서 보장이 깨질 수 있음
- Consumer의 키별 상태가 분산됨
대응 방법
- 초기 설계: 충분한 파티션 수로 시작
- 일관성 해싱: 커스텀 파티셔너로 영향 최소화
- 재처리: 파티션 변경 후 일정 기간 재처리
성능 최적화
1. Sticky Partitioning 활용 (Kafka 2.4+)
키가 없는 메시지는 자동으로 Sticky Partitioning 적용됩니다.
// 설정 불필요 - 기본 동작
ProducerRecord<String, String> record =
new ProducerRecord<>("topic", null, value);효과:
- 배치 효율 증가 (같은 파티션으로 메시지 모음)
- 처리량 30% 이상 향상
- 레이턴시 감소
2. 파티션 수 최적화
파티션 수 = max(
목표 처리량 / Consumer 처리량,
목표 처리량 / Producer 처리량
)
예제:
- 목표: 100MB/s 처리
- Consumer 처리량: 10MB/s
- 최소 파티션 수: 100 / 10 = 10개
3. 핫 파티션 방지
특정 키가 과도하게 많은 경우 솔트(salt) 추가:
public class SaltedPartitioner implements Partitioner {
private Random random = new Random();
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
if (key != null) {
String keyStr = (String) key;
// 핫 키에 랜덤 솔트 추가
if (isHotKey(keyStr)) {
int salt = random.nextInt(4); // 4개 파티션에 분산
String saltedKey = keyStr + "-" + salt;
return Math.abs(saltedKey.hashCode()) % numPartitions;
}
// 일반 키는 기본 해싱
return Math.abs(keyStr.hashCode()) % numPartitions;
}
return random.nextInt(numPartitions);
}
private boolean isHotKey(String key) {
// 핫 키 판단 로직
return key.startsWith("popular-");
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}파티셔닝 Best Practices
1. 의미있는 키 선택
// ✓ 좋은 예: 비즈니스 의미가 있는 키
record = new ProducerRecord<>("orders", order.getUserId(), order);
// ✗ 나쁜 예: 의미없는 랜덤 키
record = new ProducerRecord<>("orders", UUID.randomUUID().toString(), order);2. 키 설계 시 고려사항
- 카디널리티: 키의 고유값 수가 파티션 수보다 많아야 함
- 분포: 키가 골고루 분산되어야 함
- 안정성: 키 값이 변경되지 않아야 함
3. 파티션 수는 변경 어려움
// 초기에 충분한 파티션 수 설정
AdminClient admin = AdminClient.create(props);
NewTopic topic = new NewTopic("my-topic", 30, (short) 3);
admin.createTopics(Collections.singleton(topic));4. 순서 보장이 필요한 경우
// 관련 메시지에 같은 키 사용
ProducerRecord<String, OrderEvent> record =
new ProducerRecord<>("order-events",
order.getOrderId(), // 주문 ID를 키로
orderEvent);5. 모니터링
// 파티션별 메시지 분포 모니터링
Callback callback = (metadata, exception) -> {
if (exception == null) {
metricsCollector.recordPartition(metadata.partition());
}
};
producer.send(record, callback);일반적인 실수
1. null 키와 순서 보장 혼동
// ✗ 잘못된 가정: null 키로도 순서 보장됨
ProducerRecord<String, String> record =
new ProducerRecord<>("topic", null, value);
// 파티션이 매번 바뀔 수 있어 순서 보장 안 됨2. 파티션 수 자주 변경
// ✗ 파티션 수를 자주 변경하면 키 매핑이 깨짐
// 초기에 충분한 파티션 수로 설계3. 핫 키 문제 무시
// ✗ 인기 사용자의 이벤트가 한 파티션에 집중
record = new ProducerRecord<>("events", popularUserId, event);
// 해당 파티션이 병목이 됨
댓글 (0)