Authentication (인증)

Kafka에서 클라이언트와 브로커의 신원을 확인하는 인증 메커니즘을 살펴봅니다.

인증 개요

지원 프로토콜

┌─────────────────────────────────────────────────────────────────┐
│                  Kafka Authentication Methods                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐ │
│  │      SSL        │  │      SASL       │  │  SSL + SASL     │ │
│  │   (인증서)       │  │   (다양한 방식)  │  │    (결합)       │ │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘ │
│                                                                 │
│  SASL 메커니즘:                                                  │
│  ├── PLAIN         : 사용자명/비밀번호 (단순)                     │
│  ├── SCRAM-SHA-256 : 솔티드 해시 (권장)                          │
│  ├── SCRAM-SHA-512 : 더 강력한 해시                              │
│  ├── GSSAPI        : Kerberos                                   │
│  └── OAUTHBEARER   : OAuth 2.0 토큰                             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

보안 프로토콜

프로토콜암호화인증설명
PLAINTEXT보안 없음
SSL✓ (인증서)TLS 암호화 + 인증서 인증
SASL_PLAINTEXT✓ (SASL)SASL 인증만
SASL_SSL✓ (SASL)TLS 암호화 + SASL 인증

SSL 인증

인증서 생성

# 1. CA 키 및 인증서 생성
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
    -subj "/CN=Kafka-CA" -nodes
 
# 2. Broker 키스토어 생성
keytool -keystore kafka.server.keystore.jks -alias localhost \
    -validity 365 -genkey -keyalg RSA \
    -dname "CN=localhost" -storepass changeit
 
# 3. CSR 생성
keytool -keystore kafka.server.keystore.jks -alias localhost \
    -certreq -file cert-request -storepass changeit
 
# 4. CA로 서명
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-request \
    -out cert-signed -days 365 -CAcreateserial
 
# 5. CA 인증서와 서명된 인증서 가져오기
keytool -keystore kafka.server.keystore.jks -alias CARoot \
    -import -file ca-cert -storepass changeit -noprompt
keytool -keystore kafka.server.keystore.jks -alias localhost \
    -import -file cert-signed -storepass changeit
 
# 6. 트러스트스토어 생성
keytool -keystore kafka.server.truststore.jks -alias CARoot \
    -import -file ca-cert -storepass changeit -noprompt

Broker 설정

# server.properties
 
# 리스너 설정
listeners = SSL://0.0.0.0:9093
advertised.listeners = SSL://kafka1.example.com:9093
 
# SSL 설정
ssl.keystore.location = /path/to/kafka.server.keystore.jks
ssl.keystore.password = changeit
ssl.key.password = changeit
ssl.truststore.location = /path/to/kafka.server.truststore.jks
ssl.truststore.password = changeit
 
# 클라이언트 인증
ssl.client.auth = required  # none, requested, required

클라이언트 설정

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1.example.com:9093");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/path/to/client.truststore.jks");
props.put("ssl.truststore.password", "changeit");
 
// 클라이언트 인증서 (ssl.client.auth=required 시)
props.put("ssl.keystore.location", "/path/to/client.keystore.jks");
props.put("ssl.keystore.password", "changeit");
props.put("ssl.key.password", "changeit");

SASL/PLAIN

Broker 설정

# server.properties
listeners = SASL_PLAINTEXT://0.0.0.0:9092
security.inter.broker.protocol = SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol = PLAIN
sasl.enabled.mechanisms = PLAIN

JAAS 설정

# kafka_server_jaas.conf
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret"
    user_bob="bob-secret";
};
# Broker 시작 시 JAAS 파일 지정
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf"

클라이언트 설정

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username=\"alice\" password=\"alice-secret\";");

SASL/SCRAM

사용자 생성

# SCRAM-SHA-256 사용자 생성
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter \
    --add-config 'SCRAM-SHA-256=[password=alice-secret]' \
    --entity-type users \
    --entity-name alice
 
# SCRAM-SHA-512 사용자 생성
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter \
    --add-config 'SCRAM-SHA-512=[password=bob-secret]' \
    --entity-type users \
    --entity-name bob
 
# 사용자 목록 확인
kafka-configs.sh --bootstrap-server localhost:9092 \
    --describe \
    --entity-type users

Broker 설정

# server.properties
listeners = SASL_SSL://0.0.0.0:9093
security.inter.broker.protocol = SASL_SSL
sasl.mechanism.inter.broker.protocol = SCRAM-SHA-256
sasl.enabled.mechanisms = SCRAM-SHA-256,SCRAM-SHA-512

JAAS 설정

# kafka_server_jaas.conf
KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret";
};

클라이언트 설정

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"alice\" password=\"alice-secret\";");
 
// SSL 설정
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "changeit");

SASL/GSSAPI (Kerberos)

Broker 설정

# server.properties
listeners = SASL_PLAINTEXT://0.0.0.0:9092
security.inter.broker.protocol = SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol = GSSAPI
sasl.enabled.mechanisms = GSSAPI
sasl.kerberos.service.name = kafka

JAAS 설정

# kafka_server_jaas.conf
KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/path/to/kafka.keytab"
    principal="kafka/kafka1.example.com@EXAMPLE.COM";
};

클라이언트 설정

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.jaas.config",
    "com.sun.security.auth.module.Krb5LoginModule required " +
    "useTicketCache=true;");

SASL/OAUTHBEARER

Broker 설정

# server.properties
sasl.enabled.mechanisms = OAUTHBEARER
sasl.mechanism.inter.broker.protocol = OAUTHBEARER
listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class = \
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class = \
    org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler

클라이언트 설정

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "OAUTHBEARER");
props.put("sasl.login.callback.handler.class",
    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler");
props.put("sasl.oauthbearer.token.endpoint.url", "https://auth.example.com/oauth/token");

다중 리스너

내부/외부 분리

# server.properties
listeners = INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
advertised.listeners = INTERNAL://internal.kafka:9092,EXTERNAL://external.kafka:9093
listener.security.protocol.map = INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_SSL
 
# 내부 통신 설정
inter.broker.listener.name = INTERNAL
 
# 리스너별 SASL 설정
listener.name.internal.sasl.enabled.mechanisms = PLAIN
listener.name.external.sasl.enabled.mechanisms = SCRAM-SHA-256

인증 메커니즘 선택

권장 가이드

환경권장 메커니즘이유
개발/테스트PLAIN단순함
프로덕션SCRAM-SHA-256보안 + 관리 용이
엔터프라이즈GSSAPI (Kerberos)중앙 집중 인증
클라우드OAUTHBEAREROAuth 통합

비교

보안 수준:
PLAIN < SCRAM-SHA-256 < SCRAM-SHA-512 ≈ GSSAPI

관리 복잡도:
PLAIN < SCRAM < OAUTHBEARER < GSSAPI

확장성:
PLAIN ≈ SCRAM < OAUTHBEARER ≈ GSSAPI

관련 문서