Consumer Lag 모니터링
Consumer Lag은 Kafka 시스템의 건강 상태를 나타내는 핵심 지표입니다.
Consumer Lag 개념
정의
Consumer Lag은 Producer가 생산한 메시지와 Consumer가 소비한 메시지 간의 차이입니다.
Partition:
┌────┬────┬────┬────┬────┬────┬────┬────┬────┬────┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │
└────┴────┴────┴────┴────┴────┴────┴────┴────┴────┘
↑ ↑
Consumer Log End
Offset Offset
(5) (10)
Consumer Lag = Log End Offset - Consumer Offset = 10 - 5 = 5
Lag 계산 공식
Consumer Lag = Log End Offset - Committed Offset
= (가장 최근 메시지 위치) - (마지막으로 커밋된 위치)
Lag의 의미
| Lag 상태 | 의미 |
|---|---|
| Lag = 0 | 실시간 처리 중 |
| Lag 증가 | 소비 속도 < 생산 속도 |
| Lag 감소 | 소비 속도 > 생산 속도 |
| Lag 일정 | 소비 속도 ≈ 생산 속도 |
Lag 발생 원인
1. Consumer 처리 속도 부족
생산: 10,000 msg/s
소비: 5,000 msg/s
→ 초당 5,000 메시지씩 Lag 증가
2. Consumer 장애
- Consumer 다운
- 네트워크 문제
- 리밸런싱 중 일시 중단
3. 불균형한 파티션 할당
Partition 0: 1,000,000 messages (Consumer 1)
Partition 1: 100,000 messages (Consumer 2)
→ Consumer 1에서 높은 Lag
4. 처리 로직 병목
- 외부 API 호출 지연
- 데이터베이스 쓰기 지연
- 복잡한 변환 로직
5. 리소스 부족
- CPU/메모리 부족
- 네트워크 대역폭 한계
- 디스크 I/O 병목
Lag 모니터링 방법
1. Kafka 명령행 도구
kafka-consumer-groups.sh
# Consumer Group 상태 확인
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group my-consumer-group출력 예시:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-group my-topic 0 1000 1050 50 consumer-1-xxx /192.168.1.10 consumer-1
my-group my-topic 1 2000 2000 0 consumer-2-xxx /192.168.1.11 consumer-2
my-group my-topic 2 1500 1600 100 consumer-3-xxx /192.168.1.12 consumer-3
전체 Lag 합계
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group my-consumer-group \
| awk 'NR>1 {sum+=$6} END {print "Total Lag: " sum}'2. Java Consumer API
// Consumer 내부에서 Lag 확인
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumer.assignment());
for (TopicPartition partition : consumer.assignment()) {
long currentPosition = consumer.position(partition);
long endOffset = endOffsets.get(partition);
long lag = endOffset - currentPosition;
System.out.printf("Partition %s: position=%d, end=%d, lag=%d%n",
partition, currentPosition, endOffset, lag);
}3. AdminClient API
AdminClient admin = AdminClient.create(props);
// Consumer Group 오프셋 조회
ListConsumerGroupOffsetsResult offsetsResult =
admin.listConsumerGroupOffsets("my-consumer-group");
Map<TopicPartition, OffsetAndMetadata> offsets =
offsetsResult.partitionsToOffsetAndMetadata().get();
// Log End Offset 조회
Map<TopicPartition, Long> endOffsets =
consumer.endOffsets(offsets.keySet());
// Lag 계산
for (TopicPartition partition : offsets.keySet()) {
long committed = offsets.get(partition).offset();
long end = endOffsets.get(partition);
long lag = end - committed;
System.out.printf("Partition %s: lag=%d%n", partition, lag);
}4. JMX 메트릭
Consumer 메트릭
kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id}
| 메트릭 | 설명 |
|---|---|
records-lag | 파티션별 Lag |
records-lag-max | 최대 Lag |
records-lag-avg | 평균 Lag |
JMX 접근
// JMX를 통한 메트릭 조회
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = new ObjectName(
"kafka.consumer:type=consumer-fetch-manager-metrics,client-id=my-consumer");
Double lagMax = (Double) mBeanServer.getAttribute(objectName, "records-lag-max");
System.out.println("Max Lag: " + lagMax);모니터링 도구 연동
Prometheus + Grafana
kafka_exporter 사용
# docker-compose.yml
kafka-exporter:
image: danielqsj/kafka-exporter
command:
- '--kafka.server=kafka:9092'
ports:
- "9308:9308"주요 메트릭
# Consumer Group Lag
kafka_consumergroup_lag
# 특정 그룹의 총 Lag
sum(kafka_consumergroup_lag{consumergroup="my-group"})
# Lag 증가율
rate(kafka_consumergroup_lag[5m])Grafana 대시보드 쿼리
# 파티션별 Lag
kafka_consumergroup_lag{consumergroup="$consumer_group", topic="$topic"}
# Lag 추이
increase(kafka_consumergroup_lag{consumergroup="$consumer_group"}[1h])
# Lag 알림 조건
kafka_consumergroup_lag > 10000Burrow (LinkedIn)
Kafka Consumer Lag 체크 전용 도구입니다.
# burrow.toml 설정
[zookeeper]
servers = ["zk1:2181", "zk2:2181"]
[kafka.cluster]
brokers = ["kafka1:9092", "kafka2:9092"]
[consumer.group]
class-name = "kafka"
[httpserver]
port = 8000Burrow API
# Consumer Group 상태
curl http://localhost:8000/v3/kafka/cluster/consumer/my-group/status
# Lag 조회
curl http://localhost:8000/v3/kafka/cluster/consumer/my-group/lagDatadog / New Relic
// Datadog 메트릭 전송
StatsDClient statsd = new NonBlockingStatsDClient("kafka", "localhost", 8125);
// Lag 메트릭 전송
statsd.gauge("kafka.consumer.lag", lag,
"consumer_group:" + groupId,
"topic:" + topic,
"partition:" + partition);알림 설정
알림 기준
| 수준 | Lag 기준 | 조치 |
|---|---|---|
| Info | > 1,000 | 모니터링 |
| Warning | > 10,000 | 확인 필요 |
| Critical | > 100,000 | 즉시 대응 |
Prometheus AlertManager 규칙
groups:
- name: kafka-consumer-alerts
rules:
- alert: KafkaConsumerLagWarning
expr: kafka_consumergroup_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "High Consumer Lag detected"
description: "Consumer group {{ $labels.consumergroup }} has lag {{ $value }}"
- alert: KafkaConsumerLagCritical
expr: kafka_consumergroup_lag > 100000
for: 2m
labels:
severity: critical
annotations:
summary: "Critical Consumer Lag"
description: "Consumer group {{ $labels.consumergroup }} lag is critically high"
- alert: KafkaConsumerLagIncreasing
expr: rate(kafka_consumergroup_lag[10m]) > 100
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer Lag increasing rapidly"Lag 해결 방안
1. Consumer 스케일 아웃
// 파티션 수 = Consumer 수 권장
// 토픽 파티션: 6개
// Consumer Group: 6개 Consumer2. 처리 로직 최적화
// Before: 동기 처리
for (ConsumerRecord<String, String> record : records) {
processSync(record); // 100ms per record
}
// After: 비동기 처리
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<?>> futures = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
futures.add(executor.submit(() -> processAsync(record)));
}
// 모든 처리 완료 대기
for (Future<?> future : futures) {
future.get();
}
consumer.commitSync();3. 배치 크기 조정
// 처리량 증가를 위한 배치 크기 증가
props.put("max.poll.records", "1000");
props.put("fetch.min.bytes", "1048576"); // 1MB4. 파티션 재분배
# 파티션 수 증가 (주의: 되돌릴 수 없음)
kafka-topics.sh --alter \
--topic my-topic \
--partitions 12 \
--bootstrap-server localhost:90925. Consumer 설정 튜닝
// 긴 처리 시간 허용
props.put("max.poll.interval.ms", "600000"); // 10분
// 더 많은 데이터 가져오기
props.put("fetch.max.bytes", "104857600"); // 100MBLag 모니터링 Best Practices
1. 다중 지표 활용
# Lag 절대값
kafka_consumergroup_lag
# Lag 증가율
rate(kafka_consumergroup_lag[5m])
# Consumer 처리율
rate(kafka_consumer_records_consumed_total[5m])2. 기준선 설정
평상시 Lag: 100-500
허용 범위: < 5,000
경고: > 10,000
위험: > 50,000
3. 대시보드 구성
[ Consumer Group Overview ]
├── Total Lag across all partitions
├── Lag per partition (heatmap)
├── Lag trend (time series)
├── Consumer count
└── Messages consumed rate
[ Alerts Panel ]
├── Active critical alerts
├── Recent warnings
└── Lag increase rate alerts
4. 정기 점검
# 일일 점검 스크립트
#!/bin/bash
GROUPS=$(kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list)
for GROUP in $GROUPS; do
echo "=== $GROUP ==="
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group $GROUP \
| grep -v "^$"
done5. 자동 스케일링 연동
# Kubernetes HPA 예시
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: kafka-consumer-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: kafka-consumer
minReplicas: 2
maxReplicas: 10
metrics:
- type: External
external:
metric:
name: kafka_consumergroup_lag
selector:
matchLabels:
consumergroup: my-group
target:
type: AverageValue
averageValue: "5000"
댓글 (0)