들어가며
이번 포스팅에서는 MSA 기반 시스템에서 이벤트 파이프라인 역할을 주로 하는 Kafka(카프카)의 보안 방식에 대해 알아보도록 한다.
카프카 클러스터는 기본적으로 암호화 인증이나 ACLs 설정이 되어 있지 않다. 그래서 모든 클라이언트는 PLAINTEXT 포트를 사용하여 통신이 가능하다. 또한 네트워크 분할 and/or ACLs를 사용하여 신뢰할 수 있는 IP만 접근이 가능하도록 제한할 수 있다.
테스트 환경 설치
vagrantfile을 통해 테스트 환경을 구축한다.
## vagrantfile
Vagrant.configure("2") do |config|
config.vm.box = "bento/ubuntu-20.04"
config.vm.provider :virtualbox do |vb|
# # Don't boot with headless mode
# vb.gui = true
#
# # Use VBoxManage to customize the VM. For example to change memory:
vb.customize ["modifyvm", :id, "--memory", "4096", "--cpus", "2"]
config.vm.network :forwarded_port, guest: 9092, host: 9092
config.vm.network :forwarded_port, guest: 2181, host: 2181
config.vm.network "private_network", ip: "192.168.1.200"
end
sudo apt-get install -y wget net-tools netcat tar openjdk-8-jdk
wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -xzf kafka_2.11-1.0.0.tgz
ln -s kafka_2.11-1.0.0 kafka
ll kafka
#주키퍼 실행
~/kafka/bin/zookeeper-server-start.sh -daemon ~/kafka/config/zookeeper.properties
tail -n 5 ~/kafka/logs/zookeeper.out
echo "ruok" | nc localhost 2181 ; echo #Zookeeper의 정상 동작여부 확인
#카프카 실행
~/kafka/bin/kafka-server-start.sh -daemon ~/kafka/config/server.properties
tail -n 10 ~/kafka/logs/kafkaServer.out
(참고) 서비스 형태로 등록 및 실행
sudo vi /etc/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/home/vagrant/kafka/bin/zookeeper-server-start.sh /home/vagrant/kafka/config/zookeeper.properties
ExecStop=/home/vagrant/kafka/bin/zookeeper-server-stop.sh
[Install]
WantedBy=multi-user.target
sudo vi /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
[Service]
Type=simple
ExecStart=/home/vagrant/kafka/bin/kafka-server-start.sh /home/vagrant/kafka/config/server.properties
ExecStop=/home/vagrant/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
## 실행
sudo systemctl enable zookeeper
sudo systemctl status zookeeper
sudo systemctl enable kafka
sudo systemctl status kafka
Producer와 Consumer test
토픽을 발행하는 쪽을 Producer, 토픽을 사용하는 쪽을 Consumer라고 한다. Port 2181(Zookeeper), 9092(Kafka Plantext) 포트가 열려 있어야 한다.
# 환경설정 변경
vim kafka/config/server.properties
#advertised.listner: 외부에서 내부로 연결하기 위한 설정
#localhost = My Public Dns ex)192.168.33.55
advertised.listeners=PLAINTEXT://localhost:9092
zookeeper.connect=localhost:2181
sudo systemctl restart kafka
# 토픽 생성
~/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-security-topic --replication-factor 1 --partitions 2
# Producer/Consumer 테스트
# Producer
~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-security-topic
# Consumer
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-security-topic
암호화
SSL Encryption
SSL 암호화를 통해 데이터를 보호할 수 있다. 하지만 카프카 브로커의 암/복호화를 통해 latency, CPU, RAM 소모량이 늘어나고, 컨슈머 역할을 하는 앱 클라이언트의 latency, CPU 소모량이 늘어난다.
Self signed certificate 생성을 위한 자체 CA(Certificate Authority) 생성
#경로 생성
mkdir ssl
cd ssl
# CA 생성
# key 길이 : 4096
# Certificate name = Kafka-Security-CA
# private key: ca-key
# public: ca-cert (인증서 외부에 공유하여 CA를 신뢰할 수 있도록 함)
# nodes = no DES: 개인키를 열 때 비밀번호로 보호하지 않겠다는 설정
# x509: 인증 알고리즘
# subj: 인증서 안에 들어갈 정보를 명시하는 부분
openssl req -new -newkey rsa:4096 -days 365 -x509 -subj "/CN=Kafka-Security-CA" -keyout ca-key -out ca-cert -nodes
keystore 생성
#server password 환경변수 설정
export SRVPASS=serversecret
cd ssl
#Keystore 생성
#pkcs12 : 산업표준 포맷
keytool -genkey -keystore kafka.server.keystore.jks -validity 365 -storepass $SRVPASS -keypass $SRVPASS -dname "CN=192.168.1.200" -storetype pkcs12
#Keystore 확인
keytool -list -v -keystore kafka.server.keystore.jks
인증서 서명 요청
keytool -keystore kafka.server.keystore.jks -certreq -file cert-file -storepass $SRVPASS -keypass $SRVPASS
ll
#cert-file 생성
#CA를 거칠 경우 cert-file을 통해 서명요청을 보낸다.
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:$SRVPASS
ll
#서명된 cert-signed 파일이 생성된다.
keytool -printcert -v -file cert-signed
# Owner = 192.168.1.200
# Issuer : Kafka-Security-CA
keystore에 CA 인증서 정보와 서명된 서버 인증서 정보 추가
# keystore에 public CA key 정보를 넣는다.
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass $SRVPASS -keypass $SRVPASS -noprompt
keytool -keystore kafka.server.keystore.jks -import -file cert-signed -storepass $SRVPASS -keypass $SRVPASS -noprompt
truststore 생성
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass $SRVPASS -keypass $SRVPASS -noprompt
#ll
# kafka.server.truststore.jks파일이 생성된다.
kafka broker에 keystore 적용
sudo vi ~/kafka/config/server.properties
# 다음의 내용 설정
# 기존
#######
advertised.listeners=PLAINTEXT://192.168.1.200:9092
zookeeper.connect=localhost:2181
#######
#변경
#######
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
advertised.listeners=PLAINTEXT://192.168.1.200:9092,SSL://192.168.1.200:9093
zookeeper.connect=localhost:2181
ssl.keystore.location=/home/ubuntu/ssl/kafka.server.keystore.jks
ssl.keystore.password=serversecret
ssl.key.password=serversecret
ssl.truststore.location=/home/ubuntu/ssl/kafka.server.truststore.jks
ssl.truststore.password=serversecret
브로커에 keystore, truststore 적용
systemctl restart kafka
systemctl status kafka
sudo grep "EndPoint" /home/vagrant/kafka/logs/server.log
# 다음과 같이 등록된 정보와 리스너 정보가 나와야한다.
#[2021-09-15 04:49:06,533] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(192.168.1.200,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(192.168.1.200,9093,ListenerName(SSL),SSL) (kafka.utils.ZkUtils)
이 단계가 끝나면 cert-file은 더 이상 필요 없다. ca-key 파일과, keystorefile은 절대 외부로 유출하지 말아야 한다.
클라이언트에 Trust store 적용
openssl s_client -connect 192.168.1.200:9093
## 출력 CONNETCED가 나오면 성공이다.
CONNECTED(00000003)
139955897906496:error:1408F10B:SSL routines:ssl3_get_record:wrong version number:../ssl/record/ssl3_record.c:331:
---
no peer certificate available
---
No client certificate CA names sent
---
SSL handshake has read 5 bytes and written 283 bytes
Verification: OK
---
New, (NONE), Cipher is (NONE)
Secure Renegotiation IS NOT supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
Early data was not sent
Verify return code: 0 (ok)
---
#비밀번호 저장
export CLIPASS=clientpass
# ssl 관련 파일 저장
mkdir ssl
cd ssl
# truststore 만들기
# 1. chane of trust. Importing CA
# 2. surver certificate 다이렉트 -> 모든 카프카 브로커마다 truststore를 생성해주어야함.
# certificate file 다운로드
scp -i ~/kafka-security.pem vagrant@192.168.1.200:/home/vagrant/ssl/ca-cert .
ll
# ca-cert 다운로드 됨
# 클라이언트 truststore 생성
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass $CLIPASS -keypass $CLIPASS -noprompt
keytool -list -v -keystore kafka.client.truststore.jks
#카프카 client 환경설정 파일 생성
vim client.properties
##내용추가
security.protocol=SSL
ssl.truststore.location=/root/kafka/ssl/kafka.client.truststore.jks
ssl.truststore.password=clientpass
SSL consumer, producer 테스트
# Producer 실행
~/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.200:9093 --topic kafka-security-topic --producer.config ~/kafka/ssl/client.properties
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9093 --topic kafka-security-topic --consumer.config ~/kafka/ssl/client.properties
SASL(Simple Authentication and Security Layer)
인터넷 프로토콜에서 인증과 데이터 보안을 위한 프레임 워크이며 애플리케이션 프로토콜들로부터 인증 메커니즘을 분리시킨다. Kafka는 kerberos, PLAIN, SCRAM, OAUTHBEARER 등의 메커니즘을 사용하여 인증/인가를 할 수 있도록 한다.
SASL(Hadoop, Kafka, etc..)은 빅데이터 애플리케이션들의 보안 표준이 되고 있다. SASL은 SASL 티켓 하나만 있으면 애플리케이션 프로토콜을 변경하지 않고 많은 시스템에 접속할 수 있도록 하기 때문에 인기가 있으며, SSL/TLS 와 조합되어 주로 사용된다.
Kafka의 SASL은 다음의 프로토콜을 지원한다.
- PLANE(username / password)
- SCRAM(modern username / password with challenge)
- GSSAPI (Kerberos authentication / Active Directory authenticaton)
- OAUTHBEARER (WIP - KIP 255) for OAuth 2 support
이 중에서 가장 복잡하지만 많이 사용되는 Kerberos Authentication을 사용해 보기로 한다.
Kuberos Authentication
MIT에서 만든 인증 프로토콜로, 데이터 교환은 암호화되고 서비스 책임자(믿을 수 있는 3rd-party 기관, KDC - Key Distribution Center)에 의해 ticket을 발급받아서 사용한다. Microsoft Active Directory 제품을 가장 많이 사용하는데, 예제에서는 MIT의 오픈 kerberos를 사용한다.
Kerberos Authetiucation의 특징은 다음과 같다.
- 모든 통신은 암호화 되어 있다.
- 패스워드는 통신을 통해 전달되지 않는다.
- 티켓이 만료될 수 있으므로 서버 시간 동기화가 필요하다.
- 클라이언트들은 valid 할 때까지 자동으로 티켓을 갱신한다.
- 클라이언트는 KDC와 상호작용 한다.
- 서버는 KDC와 통신하지 않는다.
환경설정 - 인스턴스 생성
# MIT-Kerberos 설치
sudo yum install -y krb5-server
sudo vi /var/kerberos/krb5kdc/kdc.conf
########
##수정
########
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
default_realm=KAFKA.SECURE
[realms]
KAFKA.SECURE = {
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal
}
# 88번 포트 열기
sudo vi /var/kerberos/krb5kdc/kadm5.acl
########
##수정
########
*/admin@KAFKA.SECURE *
sudo vi /var/kerberos/krb5kdc/krb5.conf
########
##수정
########
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = KAFKA.SECURE
kdc_timesync = 1
ticket_lifetime = 24h
[realms]
KAFKA.SECURE = {
admin_server = 접속주소
kdc = 접속주소
}
# kerberos db 생성
sudo /usr/sbin/kdb5_util create -s -r KAFKA.SECURE -P this-is-unsecure
sudo kadmin.local -q "add_principal -pw this-is-unsecure admin/admin"
# Kerberos 서비스 시작 및 동작 상태 확인
sudo systemctl restart krb5kdc
sudo systemctl status krb5kdc
Kerberos 관리자 생성
Keytab: 서비스를 제공하는 모든 호스트에는 keytab이라는 localfile이 있음. 서비스키라는 해당 서비스에 대한 Identity가 있다.
sudo kadmin.local -q "add_principal -randkey reader@KAFKA.SECURE"
sudo kadmin.local -q "add_principal -randkey writer@KAFKA.SECURE"
sudo kadmin.local -q "add_principal -randkey admin@KAFKA.SECURE"
# Host는 kafka server ip를 설정해야한다.
sudo kadmin.local -q "add_principal -randkey kafka/192.168.1.200@KAFKA.SECURE"
# keytabs 생성
# Keytab 파일에 권한자 정보를 넘겨준다.
sudo kadmin.local -q "xst -kt /tmp/reader.user.keytab reader@KAFKA.SECURE"
sudo kadmin.local -q "xst -kt /tmp/writer.user.keytab writer@KAFKA.SECURE"
sudo kadmin.local -q "xst -kt /tmp/admin.user.keytab admin@KAFKA.SECURE"
sudo kadmin.local -q "xst -kt /tmp/kafka.service.keytab kafka/192.168.1.200@KAFKA.SECURE"
ll /tmp/*.keytab
## 출력
-rw------- 1 root root 506 Sep 28 15:48 /tmp/admin.user.keytab
-rw------- 1 root root 626 Sep 28 15:49 /tmp/kafka.service.keytab
-rw------- 1 root root 514 Sep 28 15:48 /tmp/reader.user.keytab
-rw------- 1 root root 514 Sep 28 15:48 /tmp/writer.user.keytab
## Local에 keytab 파일 가져오기
scp root@192.168.33.56:/tmp/*.keytab /tmp/
## Local keytab file 카프카 서버로 옮기기
scp /tmp/kafka.service.keytab vagrant@192.168.1.200:/tmp/
# kafka용 keytab 삭제
rm -f ./tmp/kafka.service.keytab
sudo chmod 600 ./tmp/*.keytab
Kerberos ticket 발행 테스트
# Ubuntu client 측
export DEBIAN_FRONTEND=noninteractive && sudo apt-get install -y krb5-user
sudo vi /etc/krb5.conf
##############
##수정
############
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = KAFKA.SECURE
kdc_timesync = 1
ticket_lifetime = 24h
[realms]
KAFKA.SECURE = {
admin_server = 192.168.33.56
kdc = 192.168.33.56
}
# 티켓 가져오기
kinit -kt /tmp/admin.user.keytab admin
klist
#####
## 출력
######
Valid starting Expires Service principal
09/28/21 08:24:35 09/29/21 08:24:34 krbtgt/KAFKA.SECURE@KAFKA.SECURE
Kafka에 Keberos Authentication 적용
# Kafka에서 Kerberos 인증 환경설정
sudo vi kafka/config/server.properties
####
##추가항목
####
listners에 ,SASL_SSL//0.0.0.0:9094 추가
advertiesd.listners에 ,SASL_SSL://192.168.1.200:9094 추가
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka #서비스 권한자와 매칭
## 추가 결과
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093,SASL_SSL://0.0.0.0:9094
advertised.listeners=PLAINTEXT://192.168.1.200:9092,SSL://192.168.1.200:9093,SASL_SSL://192.168.1.200:9094
zookeeper.connect=localhost:2181
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
ssl.keystore.location=/home/vagrant/ssl/kafka.server.keystore.jks
ssl.keystore.password=serversecret
ssl.key.password=serversecret
ssl.truststore.location=/home/vagrant/ssl/kafka.server.truststore.jks
ssl.truststore.password=serversecret
ssl.client.auth=required
#Kafka server에 jaas config 파일 만들기
sudo vi kafka/config/kafka_server_jaas.conf
####
## 수정
####
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/tmp/kafka.service.keytab"
principal="kafka/192.168.33.56@KAFKA.SECURE";
};
# systemd scrpit에 반영
sudo vim /etc/systemd/system/kafka.service
####
##추가
####
Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/home/vagrant/kafka/config/kafka_server_jaas.conf"
##추가 결과
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
[Service]
Type=simple
Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/home/vagrant/kafka/config/kafka_server_jaas.conf"
ExecStart=/home/vagrant/kafka/bin/kafka-server-start.sh /home/vagrant/kafka/config/server.properties
ExecStop=/home/vagrant/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
# 카프카 재시작
sudo systemctl daemon-reload
# 9094포트 오픈
# KDC 88 포트도 열기
Kafka Client 환경설정
# Kafka Client 속성 준비
# SASL connection을 위한 설정 추가
# Client가 TicketCashe를 사용하도록 지정
vim /tmp/kafka_client_jaas.conf
#####
##추가
#####
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
vim /tmp/kafka_client_kerberos.properties
#####
##추가
#####
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
ssl.truststore.location=/home/devlos/kafka/ssl/kafka.client.truststore.jks
ssl.truststore.password=clientpass
# 환경변수 설정
# Jass 위치 설정
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/kafka_client_jaas.conf"
kinit -kt /tmp/admin.user.keytab admin
Kafka Authorization
ACLs(Access Control Lists)을 이용한 방법
- 토픽 읽기/쓰기 권한 관리, 사용자별 컨슈머 그룹 사용 권한, 클라이언트의 토픽 생성/삭제 및 기타 설정 권한설정을 한다.
- 슈퍼유저는 ACLs 없이도 모든 권한을 가진다.
- User group 개념이 없기 때문에 각 사용자별로 권한을 가진다.
- ACLs는 Zookeeper에 저장된다.
- 카프카 관리자만 토픽 생성 및 ACLs 작성권한을 가진다.
Kerberos를 이용한 방법
# 1. 카프카 서버 설정
# Produce & consumer 메시지 준비
# kafka/config/server.properties 파일에 다음의 내용 추가
vim kafka/config/server.properties
#######
## 추가
#######
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 이 설정을 아이디에 적용하면 ACL 인증 과정을 거치지 않음
super.users=User:admin;User:kafka
arrow.everyone.if.no.acl.found=false
# 모든 SASL 관련 내용을 적용시킬 수 있음
security.inter.broker.protocol=SASL_SSL
sudo systemctl restart kafka
sudo systemctl status kafka
# 2. 토픽생성
kafka/bin/kafka-topics.sh \
--zookeeper 192.168.33.56:2181 \
--create \
--topic acl-test \
--replication-factor 1 --partitions 1
# user reader: acl-test 토픽 소비
# user writer: acl-test 토픽 생성
# user admin: acl-test 토픽 관련 모든 작업 가능
# 3. 권한 설정
# 컨슈머 그룹은 모든 그룹 접근이 가능하도록 설정
kafka-acl.sh \
--authorizer-properties zookeeper.connect=192.168.33.56:2181 --add \
--arrow-principal User:reader --allow-principal User:writer \
--opertation Read \
--group=* \
--topic acl-test
kafka-acl.sh \
--authorizer-properties zookeeper.connect=192.168.33.56:2181 --add \
--allow-principal User:writer \
--opertation Write \
--topic acl-test
# 결과확인
kafka-acl.sh \
--authorizer-properties zookeeper.connect=192.168.33.56:2181 \
--list \
--topic acl-test
# 4. 테스트
# 환경변수 설정
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/kafka_client_jaas.conf"
# Producer 실행
kdestroy
kinit -kt /tmp/writer.user.keytab writer
klist
kafka/bin/kafka-console-producer.sh \
--broker-list 192.168.33.56:9094 \
--topic acl-test \
-- producer.config /tmp/kafka_client_kerberos.properties
# Consumer 실행
kafka/bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.33.56:9094 \
--topic-acl-test \
--consumer.config /tmp/kafka_client_kerberos.properties
# 5. 권한 삭제
kafka/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=192.168.33.56:2181 --remove \
--allow-principal User:reader \
--operation Read \
--topic acl-test
마치며
이번 포스팅에서는 Kerberos Authentication을 이용한 카프카 보안방식을 살펴보았다. 사실 예제를 통해 알아본 Kerberos Authentication은 아직까지 실무에는 도입해 보지 못했다. 아직까지 마이크로 서비스 토픽을 관리하는 구성원이 많지 않고, 이벤트 브로커가 내부망에서 동작하기 때문에 외부에서는 아예 접근을 할 수 없기 때문이다.
하지만 향후 마이크로서비스들을 통합하거나, 더 큰 시스템 환경이 도입되고 연관 부서가 더욱 많아지게 된다면 이러한 보안 구성이 필수적으로 적용되어야 할 것이다.