Partitioner와 메시지 분배 전략

Partitioner는 Producer가 메시지를 어느 파티션으로 전송할지 결정하는 컴포넌트입니다. 올바른 파티셔닝 전략은 데이터 분산, 처리 순서, 성능에 큰 영향을 미칩니다.

Partitioner의 역할

ProducerRecord (Topic, Key, Value)
    ↓
Partitioner
    ↓
Partition 0, 1, 2, ... N

Partitioner는 다음을 결정합니다:

  1. 메시지가 저장될 파티션 번호
  2. 같은 키를 가진 메시지의 순서 보장
  3. 파티션 간 로드 밸런싱

기본 파티셔닝 전략

1. 키 기반 파티셔닝 (Key-based)

메시지에 키가 있는 경우, 키의 해시값을 기반으로 파티션을 결정합니다.

동작 방식

partition = hash(key) % number_of_partitions

특징

  • 순서 보장: 같은 키를 가진 메시지는 항상 같은 파티션으로 전송
  • 일관성: 키가 동일하면 항상 동일한 파티션에 할당
  • 불균형 가능: 특정 키가 많으면 파티션 불균형 발생 가능

예제

// 키를 지정하여 전송
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders", "user-123", orderData);
 
// user-123 키를 가진 모든 메시지는 같은 파티션으로 전송됨

사용 사례

  • 사용자별 이벤트 순서 보장 (userId를 키로 사용)
  • 세션별 로그 그룹핑 (sessionId를 키로 사용)
  • 디바이스별 데이터 정렬 (deviceId를 키로 사용)

2. 라운드 로빈 (Round-robin)

메시지에 키가 없는 경우, 모든 파티션에 순차적으로 분산합니다.

동작 방식 (Kafka 2.4+)

  • Sticky Partitioning: 배치가 가득 차거나 linger.ms 타임아웃까지 같은 파티션 사용
  • 이전 버전: 메시지마다 파티션 변경 (비효율적)

특징

  • 균등 분산: 모든 파티션에 균등하게 메시지 분배
  • 높은 처리량: Sticky Partitioning으로 배치 효율 향상
  • 순서 없음: 전체 순서는 보장되지 않음

예제

// 키 없이 전송 (null 키)
ProducerRecord<String, String> record =
    new ProducerRecord<>("logs", null, logData);
 
// 파티션이 자동으로 선택됨 (라운드 로빈)

사용 사례

  • 순서가 중요하지 않은 로그
  • 메트릭 데이터
  • 단순 데이터 분산

3. 명시적 파티션 지정

파티션 번호를 직접 지정할 수 있습니다.

예제

// 파티션 2로 직접 전송
ProducerRecord<String, String> record =
    new ProducerRecord<>("my-topic", 2, "key", "value");

주의사항

  • 파티션 번호를 잘못 지정하면 에러 발생
  • 파티션 수가 변경되면 코드 수정 필요
  • 일반적으로 권장하지 않음

커스텀 Partitioner 구현

기본 파티셔닝 전략이 적합하지 않은 경우 커스텀 파티셔너를 구현할 수 있습니다.

Partitioner 인터페이스

public interface Partitioner extends Configurable, Closeable {
    /**
     * 파티션 번호를 계산
     */
    int partition(String topic, Object key, byte[] keyBytes,
                  Object value, byte[] valueBytes, Cluster cluster);
 
    /**
     * Partitioner 종료 시 호출
     */
    void close();
 
    /**
     * 설정 초기화 시 호출
     */
    void configure(Map<String, ?> configs);
}

예제 1: VIP 고객 전용 파티션

public class VIPPartitioner implements Partitioner {
 
    private static final int VIP_PARTITION = 0;
 
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
 
        int numPartitions = cluster.partitionCountForTopic(topic);
 
        // VIP 고객은 파티션 0으로
        if (key != null && ((String) key).startsWith("VIP-")) {
            return VIP_PARTITION;
        }
 
        // 일반 고객은 나머지 파티션에 해시 분산
        return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1) + 1;
    }
 
    @Override
    public void close() {}
 
    @Override
    public void configure(Map<String, ?> configs) {}
}

사용 예제:

props.put("partitioner.class", "com.example.VIPPartitioner");

예제 2: 지역 기반 파티셔닝

public class RegionPartitioner implements Partitioner {
 
    private Map<String, Integer> regionToPartition = new HashMap<>();
 
    @Override
    public void configure(Map<String, ?> configs) {
        // 설정에서 지역-파티션 매핑 로드
        regionToPartition.put("US", 0);
        regionToPartition.put("EU", 1);
        regionToPartition.put("ASIA", 2);
    }
 
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
 
        int numPartitions = cluster.partitionCountForTopic(topic);
 
        if (key != null) {
            String region = extractRegion((String) key);
            Integer partition = regionToPartition.get(region);
 
            if (partition != null) {
                return partition % numPartitions;
            }
        }
 
        // 기본값: 라운드 로빈
        return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
    }
 
    private String extractRegion(String key) {
        // key 형식: "US:user123" -> "US" 추출
        return key.split(":")[0];
    }
 
    @Override
    public void close() {}
}

예제 3: 시간 기반 파티셔닝

public class TimeBasedPartitioner implements Partitioner {
 
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
 
        int numPartitions = cluster.partitionCountForTopic(topic);
 
        // 시간대별로 파티션 분배 (0-5시: P0, 6-11시: P1, ...)
        int hour = LocalDateTime.now().getHour();
        int partitionByHour = hour / (24 / numPartitions);
 
        return partitionByHour % numPartitions;
    }
 
    @Override
    public void close() {}
 
    @Override
    public void configure(Map<String, ?> configs) {}
}

파티셔닝 전략 선택 가이드

순서가 중요한 경우

// 키를 사용하여 순서 보장
ProducerRecord<String, Order> record =
    new ProducerRecord<>("orders", order.getUserId(), order);
 
// 같은 userId의 주문은 순서대로 처리됨

로드 밸런싱이 중요한 경우

// 키 없이 전송하여 균등 분산
ProducerRecord<String, Log> record =
    new ProducerRecord<>("logs", null, logData);
 
// 또는 랜덤 키 사용
String randomKey = UUID.randomUUID().toString();
ProducerRecord<String, Log> record =
    new ProducerRecord<>("logs", randomKey, logData);

특정 그룹핑이 필요한 경우

// 커스텀 파티셔너 사용
props.put("partitioner.class", "com.example.RegionPartitioner");
 
// 지역 정보를 키에 포함
ProducerRecord<String, Event> record =
    new ProducerRecord<>("events", "US:event123", eventData);

파티션 수 변경의 영향

파티션 추가 시 주의사항

파티션 수가 변경되면 키-파티션 매핑이 변경됩니다:

# 원래: 3개 파티션
key "user-123" → hash % 3 = partition 1

# 변경 후: 5개 파티션
key "user-123" → hash % 5 = partition 3  # 매핑 변경!

영향

  • 같은 키의 메시지가 다른 파티션으로 분산될 수 있음
  • 순서 보장이 깨질 수 있음
  • Consumer의 키별 상태가 분산됨

대응 방법

  1. 초기 설계: 충분한 파티션 수로 시작
  2. 일관성 해싱: 커스텀 파티셔너로 영향 최소화
  3. 재처리: 파티션 변경 후 일정 기간 재처리

성능 최적화

1. Sticky Partitioning 활용 (Kafka 2.4+)

키가 없는 메시지는 자동으로 Sticky Partitioning 적용됩니다.

// 설정 불필요 - 기본 동작
ProducerRecord<String, String> record =
    new ProducerRecord<>("topic", null, value);

효과:

  • 배치 효율 증가 (같은 파티션으로 메시지 모음)
  • 처리량 30% 이상 향상
  • 레이턴시 감소

2. 파티션 수 최적화

파티션 수 = max(
    목표 처리량 / Consumer 처리량,
    목표 처리량 / Producer 처리량
)

예제:

  • 목표: 100MB/s 처리
  • Consumer 처리량: 10MB/s
  • 최소 파티션 수: 100 / 10 = 10개

3. 핫 파티션 방지

특정 키가 과도하게 많은 경우 솔트(salt) 추가:

public class SaltedPartitioner implements Partitioner {
 
    private Random random = new Random();
 
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
 
        int numPartitions = cluster.partitionCountForTopic(topic);
 
        if (key != null) {
            String keyStr = (String) key;
 
            // 핫 키에 랜덤 솔트 추가
            if (isHotKey(keyStr)) {
                int salt = random.nextInt(4);  // 4개 파티션에 분산
                String saltedKey = keyStr + "-" + salt;
                return Math.abs(saltedKey.hashCode()) % numPartitions;
            }
 
            // 일반 키는 기본 해싱
            return Math.abs(keyStr.hashCode()) % numPartitions;
        }
 
        return random.nextInt(numPartitions);
    }
 
    private boolean isHotKey(String key) {
        // 핫 키 판단 로직
        return key.startsWith("popular-");
    }
 
    @Override
    public void close() {}
 
    @Override
    public void configure(Map<String, ?> configs) {}
}

파티셔닝 Best Practices

1. 의미있는 키 선택

// ✓ 좋은 예: 비즈니스 의미가 있는 키
record = new ProducerRecord<>("orders", order.getUserId(), order);
 
// ✗ 나쁜 예: 의미없는 랜덤 키
record = new ProducerRecord<>("orders", UUID.randomUUID().toString(), order);

2. 키 설계 시 고려사항

  • 카디널리티: 키의 고유값 수가 파티션 수보다 많아야 함
  • 분포: 키가 골고루 분산되어야 함
  • 안정성: 키 값이 변경되지 않아야 함

3. 파티션 수는 변경 어려움

// 초기에 충분한 파티션 수 설정
AdminClient admin = AdminClient.create(props);
NewTopic topic = new NewTopic("my-topic", 30, (short) 3);
admin.createTopics(Collections.singleton(topic));

4. 순서 보장이 필요한 경우

// 관련 메시지에 같은 키 사용
ProducerRecord<String, OrderEvent> record =
    new ProducerRecord<>("order-events",
        order.getOrderId(),  // 주문 ID를 키로
        orderEvent);

5. 모니터링

// 파티션별 메시지 분포 모니터링
Callback callback = (metadata, exception) -> {
    if (exception == null) {
        metricsCollector.recordPartition(metadata.partition());
    }
};
 
producer.send(record, callback);

일반적인 실수

1. null 키와 순서 보장 혼동

// ✗ 잘못된 가정: null 키로도 순서 보장됨
ProducerRecord<String, String> record =
    new ProducerRecord<>("topic", null, value);
// 파티션이 매번 바뀔 수 있어 순서 보장 안 됨

2. 파티션 수 자주 변경

// ✗ 파티션 수를 자주 변경하면 키 매핑이 깨짐
// 초기에 충분한 파티션 수로 설계

3. 핫 키 문제 무시

// ✗ 인기 사용자의 이벤트가 한 파티션에 집중
record = new ProducerRecord<>("events", popularUserId, event);
// 해당 파티션이 병목이 됨

관련 문서