Encryption (암호화)
Kafka에서 데이터를 암호화하여 보호하는 방법을 살펴봅니다.
암호화 유형
전송 중 암호화 vs 저장 암호화
┌─────────────────────────────────────────────────────────────────┐
│ Kafka Encryption │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 전송 중 암호화 (Encryption in Transit) │
│ - SSL/TLS로 네트워크 통신 암호화 │
│ - Client ↔ Broker, Broker ↔ Broker │
│ │
│ ┌─────────┐ TLS ┌─────────┐ TLS ┌─────────┐ │
│ │ Client │ ◄──────► │ Broker │ ◄──────► │ Broker │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 2. 저장 암호화 (Encryption at Rest) │
│ - 디스크에 저장된 데이터 암호화 │
│ - Kafka 자체 지원 없음 → 외부 솔루션 필요 │
│ │
│ ┌─────────┐ ┌─────────────────────┐ │
│ │ Broker │ ────────────────► │ 암호화된 디스크 │ │
│ └─────────┘ │ (LUKS, BitLocker) │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
SSL/TLS 설정
인증서 준비
# CA 인증서 생성
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
-subj "/CN=Kafka-CA" -nodes
# Broker 키스토어 생성
keytool -keystore kafka.server.keystore.jks -alias broker \
-validity 365 -genkey -keyalg RSA -keysize 2048 \
-dname "CN=kafka.example.com" \
-ext SAN=DNS:kafka1.example.com,DNS:kafka2.example.com \
-storepass changeit -keypass changeit
# CSR 생성 및 서명
keytool -keystore kafka.server.keystore.jks -alias broker \
-certreq -file cert-request -storepass changeit
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-request \
-out cert-signed -days 365 -CAcreateserial \
-extensions v3_req
# 인증서 가져오기
keytool -keystore kafka.server.keystore.jks -alias CARoot \
-import -file ca-cert -storepass changeit -noprompt
keytool -keystore kafka.server.keystore.jks -alias broker \
-import -file cert-signed -storepass changeit
# 트러스트스토어 생성
keytool -keystore kafka.server.truststore.jks -alias CARoot \
-import -file ca-cert -storepass changeit -nopromptBroker SSL 설정
# server.properties
# SSL 리스너
listeners = PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
advertised.listeners = PLAINTEXT://kafka1:9092,SSL://kafka1:9093
# 키스토어 설정
ssl.keystore.location = /path/to/kafka.server.keystore.jks
ssl.keystore.password = changeit
ssl.key.password = changeit
# 트러스트스토어 설정
ssl.truststore.location = /path/to/kafka.server.truststore.jks
ssl.truststore.password = changeit
# 클라이언트 인증 (선택)
ssl.client.auth = required # none, requested, required
# 프로토콜 및 암호 스위트
ssl.enabled.protocols = TLSv1.2,TLSv1.3
ssl.protocol = TLSv1.3Broker 간 SSL
# Inter-Broker 통신 암호화
security.inter.broker.protocol = SSL
# 또는 SASL + SSL
security.inter.broker.protocol = SASL_SSL클라이언트 SSL 설정
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9093");
props.put("security.protocol", "SSL");
// 트러스트스토어 (서버 인증서 검증)
props.put("ssl.truststore.location", "/path/to/client.truststore.jks");
props.put("ssl.truststore.password", "changeit");
// 키스토어 (클라이언트 인증서, ssl.client.auth=required 시)
props.put("ssl.keystore.location", "/path/to/client.keystore.jks");
props.put("ssl.keystore.password", "changeit");
props.put("ssl.key.password", "changeit");
// 호스트명 검증
props.put("ssl.endpoint.identification.algorithm", "https");TLS 버전 및 암호 스위트
TLS 버전
# 권장: TLS 1.2 이상
ssl.enabled.protocols = TLSv1.2,TLSv1.3
ssl.protocol = TLSv1.3암호 스위트
# 강력한 암호 스위트만 허용
ssl.cipher.suites = TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,\
TLS_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384저장 암호화
디스크 암호화
Kafka 자체는 저장 암호화를 제공하지 않으므로 외부 솔루션을 사용합니다.
# Linux LUKS
cryptsetup luksFormat /dev/sdb
cryptsetup luksOpen /dev/sdb kafka-data
mkfs.ext4 /dev/mapper/kafka-data
mount /dev/mapper/kafka-data /var/kafka-logs클라우드 암호화
# AWS EBS 암호화
aws ec2 create-volume \
--availability-zone us-east-1a \
--size 100 \
--volume-type gp3 \
--encrypted
# GCP 디스크 암호화 (기본 제공)
# Azure Disk Encryption애플리케이션 레벨 암호화
// Producer에서 암호화
public class EncryptingSerializer implements Serializer<String> {
private Cipher cipher;
@Override
public byte[] serialize(String topic, String data) {
try {
return cipher.doFinal(data.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
throw new SerializationException(e);
}
}
}
// Consumer에서 복호화
public class DecryptingDeserializer implements Deserializer<String> {
private Cipher cipher;
@Override
public String deserialize(String topic, byte[] data) {
try {
return new String(cipher.doFinal(data), StandardCharsets.UTF_8);
} catch (Exception e) {
throw new SerializationException(e);
}
}
}성능 고려사항
SSL 오버헤드
┌─────────────────────────────────────────────────────────────────┐
│ SSL Performance Impact │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 일반적인 오버헤드: │
│ - 처리량: 20-30% 감소 │
│ - 레이턴시: 10-20% 증가 │
│ - CPU: 증가 (암호화/복호화) │
│ │
│ Zero Copy 비활성화: │
│ - SSL 사용 시 Zero Copy 불가 │
│ - 추가적인 데이터 복사 발생 │
│ │
└─────────────────────────────────────────────────────────────────┘
최적화
# Broker 설정
# SSL 핸드셰이크 스레드
num.network.threads = 8
# 연결 재사용
connections.max.idle.ms = 600000// 클라이언트 연결 풀
props.put("connections.max.idle.ms", "600000");인증서 관리
인증서 갱신
# 만료 확인
keytool -list -v -keystore kafka.server.keystore.jks \
| grep "Valid from"
# 새 인증서로 갱신 (롤링 재시작 필요)
# 1. 새 인증서를 트러스트스토어에 추가
# 2. 순차적으로 Broker 재시작
# 3. 이전 인증서 제거인증서 모니터링
# 만료 알림 스크립트
#!/bin/bash
EXPIRY=$(keytool -list -v -keystore kafka.server.keystore.jks \
-storepass changeit | grep "until" | head -1)
# 만료 30일 전 알림문제 해결
일반적인 오류
# 핸드셰이크 실패
javax.net.ssl.SSLHandshakeException
해결:
1. 인증서 만료 확인
2. 트러스트스토어에 CA 인증서 확인
3. 호스트명 일치 확인
# 호스트명 불일치
java.security.cert.CertificateException: No subject alternative names present
해결:
# SAN (Subject Alternative Name) 추가
keytool -genkey ... -ext SAN=DNS:kafka1.example.com
디버깅
# SSL 디버그 활성화
export KAFKA_OPTS="-Djavax.net.debug=ssl,handshake"
# 연결 테스트
openssl s_client -connect kafka1:9093 -tls1_3Best Practices
1. TLS 1.3 사용
ssl.protocol = TLSv1.3
ssl.enabled.protocols = TLSv1.32. 강력한 암호 스위트
ssl.cipher.suites = TLS_AES_256_GCM_SHA384,...3. 인증서 자동 갱신
# cert-manager (Kubernetes)
# HashiCorp Vault
# Let's Encrypt (외부 노출 시)4. 내부/외부 분리
# 내부: 성능 우선
listeners = PLAINTEXT://internal:9092,SSL://external:9093
# 외부: 보안 필수
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL5. 정기 감사
# 인증서 만료 모니터링
# 암호 스위트 취약점 확인
# TLS 버전 업데이트
댓글 (0)