Consumer Lag 모니터링

Consumer Lag은 Kafka 시스템의 건강 상태를 나타내는 핵심 지표입니다.

Consumer Lag 개념

정의

Consumer Lag은 Producer가 생산한 메시지와 Consumer가 소비한 메시지 간의 차이입니다.

Partition:
┌────┬────┬────┬────┬────┬────┬────┬────┬────┬────┐
│ 0  │ 1  │ 2  │ 3  │ 4  │ 5  │ 6  │ 7  │ 8  │ 9  │
└────┴────┴────┴────┴────┴────┴────┴────┴────┴────┘
                    ↑                             ↑
              Consumer                      Log End
               Offset                        Offset
                (5)                           (10)

Consumer Lag = Log End Offset - Consumer Offset = 10 - 5 = 5

Lag 계산 공식

Consumer Lag = Log End Offset - Committed Offset
             = (가장 최근 메시지 위치) - (마지막으로 커밋된 위치)

Lag의 의미

Lag 상태의미
Lag = 0실시간 처리 중
Lag 증가소비 속도 < 생산 속도
Lag 감소소비 속도 > 생산 속도
Lag 일정소비 속도 ≈ 생산 속도

Lag 발생 원인

1. Consumer 처리 속도 부족

생산: 10,000 msg/s
소비: 5,000 msg/s
→ 초당 5,000 메시지씩 Lag 증가

2. Consumer 장애

  • Consumer 다운
  • 네트워크 문제
  • 리밸런싱 중 일시 중단

3. 불균형한 파티션 할당

Partition 0: 1,000,000 messages (Consumer 1)
Partition 1: 100,000 messages (Consumer 2)
→ Consumer 1에서 높은 Lag

4. 처리 로직 병목

  • 외부 API 호출 지연
  • 데이터베이스 쓰기 지연
  • 복잡한 변환 로직

5. 리소스 부족

  • CPU/메모리 부족
  • 네트워크 대역폭 한계
  • 디스크 I/O 병목

Lag 모니터링 방법

1. Kafka 명령행 도구

kafka-consumer-groups.sh

# Consumer Group 상태 확인
kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --describe \
    --group my-consumer-group

출력 예시:

GROUP           TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG    CONSUMER-ID                                HOST           CLIENT-ID
my-group        my-topic   0          1000            1050            50     consumer-1-xxx                             /192.168.1.10  consumer-1
my-group        my-topic   1          2000            2000            0      consumer-2-xxx                             /192.168.1.11  consumer-2
my-group        my-topic   2          1500            1600            100    consumer-3-xxx                             /192.168.1.12  consumer-3

전체 Lag 합계

kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --describe \
    --group my-consumer-group \
    | awk 'NR>1 {sum+=$6} END {print "Total Lag: " sum}'

2. Java Consumer API

// Consumer 내부에서 Lag 확인
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumer.assignment());
 
for (TopicPartition partition : consumer.assignment()) {
    long currentPosition = consumer.position(partition);
    long endOffset = endOffsets.get(partition);
    long lag = endOffset - currentPosition;
 
    System.out.printf("Partition %s: position=%d, end=%d, lag=%d%n",
        partition, currentPosition, endOffset, lag);
}

3. AdminClient API

AdminClient admin = AdminClient.create(props);
 
// Consumer Group 오프셋 조회
ListConsumerGroupOffsetsResult offsetsResult =
    admin.listConsumerGroupOffsets("my-consumer-group");
 
Map<TopicPartition, OffsetAndMetadata> offsets =
    offsetsResult.partitionsToOffsetAndMetadata().get();
 
// Log End Offset 조회
Map<TopicPartition, Long> endOffsets =
    consumer.endOffsets(offsets.keySet());
 
// Lag 계산
for (TopicPartition partition : offsets.keySet()) {
    long committed = offsets.get(partition).offset();
    long end = endOffsets.get(partition);
    long lag = end - committed;
 
    System.out.printf("Partition %s: lag=%d%n", partition, lag);
}

4. JMX 메트릭

Consumer 메트릭

kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id}
메트릭설명
records-lag파티션별 Lag
records-lag-max최대 Lag
records-lag-avg평균 Lag

JMX 접근

// JMX를 통한 메트릭 조회
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = new ObjectName(
    "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=my-consumer");
 
Double lagMax = (Double) mBeanServer.getAttribute(objectName, "records-lag-max");
System.out.println("Max Lag: " + lagMax);

모니터링 도구 연동

Prometheus + Grafana

kafka_exporter 사용

# docker-compose.yml
kafka-exporter:
  image: danielqsj/kafka-exporter
  command:
    - '--kafka.server=kafka:9092'
  ports:
    - "9308:9308"

주요 메트릭

# Consumer Group Lag
kafka_consumergroup_lag
 
# 특정 그룹의 총 Lag
sum(kafka_consumergroup_lag{consumergroup="my-group"})
 
# Lag 증가율
rate(kafka_consumergroup_lag[5m])

Grafana 대시보드 쿼리

# 파티션별 Lag
kafka_consumergroup_lag{consumergroup="$consumer_group", topic="$topic"}
 
# Lag 추이
increase(kafka_consumergroup_lag{consumergroup="$consumer_group"}[1h])
 
# Lag 알림 조건
kafka_consumergroup_lag > 10000

Burrow (LinkedIn)

Kafka Consumer Lag 체크 전용 도구입니다.

# burrow.toml 설정
[zookeeper]
servers = ["zk1:2181", "zk2:2181"]
 
[kafka.cluster]
brokers = ["kafka1:9092", "kafka2:9092"]
 
[consumer.group]
class-name = "kafka"
 
[httpserver]
port = 8000

Burrow API

# Consumer Group 상태
curl http://localhost:8000/v3/kafka/cluster/consumer/my-group/status
 
# Lag 조회
curl http://localhost:8000/v3/kafka/cluster/consumer/my-group/lag

Datadog / New Relic

// Datadog 메트릭 전송
StatsDClient statsd = new NonBlockingStatsDClient("kafka", "localhost", 8125);
 
// Lag 메트릭 전송
statsd.gauge("kafka.consumer.lag", lag,
    "consumer_group:" + groupId,
    "topic:" + topic,
    "partition:" + partition);

알림 설정

알림 기준

수준Lag 기준조치
Info> 1,000모니터링
Warning> 10,000확인 필요
Critical> 100,000즉시 대응

Prometheus AlertManager 규칙

groups:
  - name: kafka-consumer-alerts
    rules:
      - alert: KafkaConsumerLagWarning
        expr: kafka_consumergroup_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High Consumer Lag detected"
          description: "Consumer group {{ $labels.consumergroup }} has lag {{ $value }}"
 
      - alert: KafkaConsumerLagCritical
        expr: kafka_consumergroup_lag > 100000
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Critical Consumer Lag"
          description: "Consumer group {{ $labels.consumergroup }} lag is critically high"
 
      - alert: KafkaConsumerLagIncreasing
        expr: rate(kafka_consumergroup_lag[10m]) > 100
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Consumer Lag increasing rapidly"

Lag 해결 방안

1. Consumer 스케일 아웃

// 파티션 수 = Consumer 수 권장
// 토픽 파티션: 6개
// Consumer Group: 6개 Consumer

2. 처리 로직 최적화

// Before: 동기 처리
for (ConsumerRecord<String, String> record : records) {
    processSync(record);  // 100ms per record
}
 
// After: 비동기 처리
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<?>> futures = new ArrayList<>();
 
for (ConsumerRecord<String, String> record : records) {
    futures.add(executor.submit(() -> processAsync(record)));
}
 
// 모든 처리 완료 대기
for (Future<?> future : futures) {
    future.get();
}
consumer.commitSync();

3. 배치 크기 조정

// 처리량 증가를 위한 배치 크기 증가
props.put("max.poll.records", "1000");
props.put("fetch.min.bytes", "1048576");  // 1MB

4. 파티션 재분배

# 파티션 수 증가 (주의: 되돌릴 수 없음)
kafka-topics.sh --alter \
    --topic my-topic \
    --partitions 12 \
    --bootstrap-server localhost:9092

5. Consumer 설정 튜닝

// 긴 처리 시간 허용
props.put("max.poll.interval.ms", "600000");  // 10분
 
// 더 많은 데이터 가져오기
props.put("fetch.max.bytes", "104857600");  // 100MB

Lag 모니터링 Best Practices

1. 다중 지표 활용

# Lag 절대값
kafka_consumergroup_lag
 
# Lag 증가율
rate(kafka_consumergroup_lag[5m])
 
# Consumer 처리율
rate(kafka_consumer_records_consumed_total[5m])

2. 기준선 설정

평상시 Lag: 100-500
허용 범위: < 5,000
경고: > 10,000
위험: > 50,000

3. 대시보드 구성

[ Consumer Group Overview ]
├── Total Lag across all partitions
├── Lag per partition (heatmap)
├── Lag trend (time series)
├── Consumer count
└── Messages consumed rate

[ Alerts Panel ]
├── Active critical alerts
├── Recent warnings
└── Lag increase rate alerts

4. 정기 점검

# 일일 점검 스크립트
#!/bin/bash
GROUPS=$(kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list)
 
for GROUP in $GROUPS; do
    echo "=== $GROUP ==="
    kafka-consumer-groups.sh \
        --bootstrap-server localhost:9092 \
        --describe \
        --group $GROUP \
        | grep -v "^$"
done

5. 자동 스케일링 연동

# Kubernetes HPA 예시
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: kafka-consumer-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: kafka-consumer
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: External
      external:
        metric:
          name: kafka_consumergroup_lag
          selector:
            matchLabels:
              consumergroup: my-group
        target:
          type: AverageValue
          averageValue: "5000"

관련 문서