Encryption (암호화)

Kafka에서 데이터를 암호화하여 보호하는 방법을 살펴봅니다.

암호화 유형

전송 중 암호화 vs 저장 암호화

┌─────────────────────────────────────────────────────────────────┐
│                    Kafka Encryption                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. 전송 중 암호화 (Encryption in Transit)                       │
│     - SSL/TLS로 네트워크 통신 암호화                              │
│     - Client ↔ Broker, Broker ↔ Broker                          │
│                                                                 │
│  ┌─────────┐   TLS    ┌─────────┐   TLS    ┌─────────┐        │
│  │ Client  │ ◄──────► │ Broker  │ ◄──────► │ Broker  │        │
│  └─────────┘          └─────────┘          └─────────┘        │
│                                                                 │
│  2. 저장 암호화 (Encryption at Rest)                             │
│     - 디스크에 저장된 데이터 암호화                               │
│     - Kafka 자체 지원 없음 → 외부 솔루션 필요                     │
│                                                                 │
│  ┌─────────┐                    ┌─────────────────────┐        │
│  │ Broker  │ ────────────────► │ 암호화된 디스크      │        │
│  └─────────┘                    │ (LUKS, BitLocker)   │        │
│                                 └─────────────────────┘        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

SSL/TLS 설정

인증서 준비

# CA 인증서 생성
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
    -subj "/CN=Kafka-CA" -nodes
 
# Broker 키스토어 생성
keytool -keystore kafka.server.keystore.jks -alias broker \
    -validity 365 -genkey -keyalg RSA -keysize 2048 \
    -dname "CN=kafka.example.com" \
    -ext SAN=DNS:kafka1.example.com,DNS:kafka2.example.com \
    -storepass changeit -keypass changeit
 
# CSR 생성 및 서명
keytool -keystore kafka.server.keystore.jks -alias broker \
    -certreq -file cert-request -storepass changeit
 
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-request \
    -out cert-signed -days 365 -CAcreateserial \
    -extensions v3_req
 
# 인증서 가져오기
keytool -keystore kafka.server.keystore.jks -alias CARoot \
    -import -file ca-cert -storepass changeit -noprompt
keytool -keystore kafka.server.keystore.jks -alias broker \
    -import -file cert-signed -storepass changeit
 
# 트러스트스토어 생성
keytool -keystore kafka.server.truststore.jks -alias CARoot \
    -import -file ca-cert -storepass changeit -noprompt

Broker SSL 설정

# server.properties
 
# SSL 리스너
listeners = PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
advertised.listeners = PLAINTEXT://kafka1:9092,SSL://kafka1:9093
 
# 키스토어 설정
ssl.keystore.location = /path/to/kafka.server.keystore.jks
ssl.keystore.password = changeit
ssl.key.password = changeit
 
# 트러스트스토어 설정
ssl.truststore.location = /path/to/kafka.server.truststore.jks
ssl.truststore.password = changeit
 
# 클라이언트 인증 (선택)
ssl.client.auth = required  # none, requested, required
 
# 프로토콜 및 암호 스위트
ssl.enabled.protocols = TLSv1.2,TLSv1.3
ssl.protocol = TLSv1.3

Broker 간 SSL

# Inter-Broker 통신 암호화
security.inter.broker.protocol = SSL
 
# 또는 SASL + SSL
security.inter.broker.protocol = SASL_SSL

클라이언트 SSL 설정

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9093");
props.put("security.protocol", "SSL");
 
// 트러스트스토어 (서버 인증서 검증)
props.put("ssl.truststore.location", "/path/to/client.truststore.jks");
props.put("ssl.truststore.password", "changeit");
 
// 키스토어 (클라이언트 인증서, ssl.client.auth=required 시)
props.put("ssl.keystore.location", "/path/to/client.keystore.jks");
props.put("ssl.keystore.password", "changeit");
props.put("ssl.key.password", "changeit");
 
// 호스트명 검증
props.put("ssl.endpoint.identification.algorithm", "https");

TLS 버전 및 암호 스위트

TLS 버전

# 권장: TLS 1.2 이상
ssl.enabled.protocols = TLSv1.2,TLSv1.3
ssl.protocol = TLSv1.3

암호 스위트

# 강력한 암호 스위트만 허용
ssl.cipher.suites = TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,\
    TLS_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384

저장 암호화

디스크 암호화

Kafka 자체는 저장 암호화를 제공하지 않으므로 외부 솔루션을 사용합니다.

# Linux LUKS
cryptsetup luksFormat /dev/sdb
cryptsetup luksOpen /dev/sdb kafka-data
mkfs.ext4 /dev/mapper/kafka-data
mount /dev/mapper/kafka-data /var/kafka-logs

클라우드 암호화

# AWS EBS 암호화
aws ec2 create-volume \
    --availability-zone us-east-1a \
    --size 100 \
    --volume-type gp3 \
    --encrypted
 
# GCP 디스크 암호화 (기본 제공)
# Azure Disk Encryption

애플리케이션 레벨 암호화

// Producer에서 암호화
public class EncryptingSerializer implements Serializer<String> {
    private Cipher cipher;
 
    @Override
    public byte[] serialize(String topic, String data) {
        try {
            return cipher.doFinal(data.getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            throw new SerializationException(e);
        }
    }
}
 
// Consumer에서 복호화
public class DecryptingDeserializer implements Deserializer<String> {
    private Cipher cipher;
 
    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            return new String(cipher.doFinal(data), StandardCharsets.UTF_8);
        } catch (Exception e) {
            throw new SerializationException(e);
        }
    }
}

성능 고려사항

SSL 오버헤드

┌─────────────────────────────────────────────────────────────────┐
│                    SSL Performance Impact                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  일반적인 오버헤드:                                               │
│  - 처리량: 20-30% 감소                                           │
│  - 레이턴시: 10-20% 증가                                         │
│  - CPU: 증가 (암호화/복호화)                                      │
│                                                                 │
│  Zero Copy 비활성화:                                             │
│  - SSL 사용 시 Zero Copy 불가                                    │
│  - 추가적인 데이터 복사 발생                                      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

최적화

# Broker 설정
# SSL 핸드셰이크 스레드
num.network.threads = 8
 
# 연결 재사용
connections.max.idle.ms = 600000
// 클라이언트 연결 풀
props.put("connections.max.idle.ms", "600000");

인증서 관리

인증서 갱신

# 만료 확인
keytool -list -v -keystore kafka.server.keystore.jks \
    | grep "Valid from"
 
# 새 인증서로 갱신 (롤링 재시작 필요)
# 1. 새 인증서를 트러스트스토어에 추가
# 2. 순차적으로 Broker 재시작
# 3. 이전 인증서 제거

인증서 모니터링

# 만료 알림 스크립트
#!/bin/bash
EXPIRY=$(keytool -list -v -keystore kafka.server.keystore.jks \
    -storepass changeit | grep "until" | head -1)
# 만료 30일 전 알림

문제 해결

일반적인 오류

# 핸드셰이크 실패
javax.net.ssl.SSLHandshakeException

해결:
1. 인증서 만료 확인
2. 트러스트스토어에 CA 인증서 확인
3. 호스트명 일치 확인
# 호스트명 불일치
java.security.cert.CertificateException: No subject alternative names present

해결:
# SAN (Subject Alternative Name) 추가
keytool -genkey ... -ext SAN=DNS:kafka1.example.com

디버깅

# SSL 디버그 활성화
export KAFKA_OPTS="-Djavax.net.debug=ssl,handshake"
 
# 연결 테스트
openssl s_client -connect kafka1:9093 -tls1_3

Best Practices

1. TLS 1.3 사용

ssl.protocol = TLSv1.3
ssl.enabled.protocols = TLSv1.3

2. 강력한 암호 스위트

ssl.cipher.suites = TLS_AES_256_GCM_SHA384,...

3. 인증서 자동 갱신

# cert-manager (Kubernetes)
# HashiCorp Vault
# Let's Encrypt (외부 노출 시)

4. 내부/외부 분리

# 내부: 성능 우선
listeners = PLAINTEXT://internal:9092,SSL://external:9093
 
# 외부: 보안 필수
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL

5. 정기 감사

# 인증서 만료 모니터링
# 암호 스위트 취약점 확인
# TLS 버전 업데이트

관련 문서