1. Apache Kafka?
LinkedIn에서 자사의 내부 데이터 처리를 위해 개발한 대용량 분산 메세징 시스템 입니다.
Kafka는 확장이 용이하고, 높은 처리량과 분산처리를 할수 있으며, In-Memory에 비해 뒤지지 않는 성능과 비휘발성으로 데이터를 디스크 저장 한다는 장점이 있습니다.
Kafka를 활용하는 대표적인 기업으로는 LinkedIn, Twitter, Netflix, Tumblr, Foursquare가 사용하는 것으로 알고 있으며, 그 외에도 대용량 메세징을 다루는 기업에서 활용 중인걸로 알고 있습니다.
처리할 로그와 메세지들은 넘쳐나고, 안전을 생각하자니 In-Memory는 안될 것 같고, 디스크로 처리 하자니 성능이 발목을 잡고.. 이런 걱정 하시는 분들에게 좋은 솔루션인듯 합니다.
2. Kafka Architecture
Kafka는 Producer, Broker, Consumer로 나누어져 있고, 시스템에서 발생 되는 메세지를 topic으로 분류 하고 있습니다.
이해를 돕기 위해 아래 그림을 통해 원리를 알아보도록 하겠습니다.
1. 시스템에서 발생 되는 메세지를 Producer가 topic으로 분류하여 Broker에게 던져 줍니다. 2. Broker는 topic을 통해 흘러들어온 메세지들을 각각의 Partition에 저장 합니다. 저장 이후 각각의 Follower들은 장애 발생을 대비하여, 무작위로 자신의 Partition에 Leader의 topic을 복제 합니다. 3. Consumer가 원하는 topic을 선정하여, Broker로 부터 데이터를 가져 옵니다.
Figure 1. Kafka Architecture
3. 실제 구성
위의 Architecture를 토대로 실제로 구성 해보면서 이해를 해보도록 하겠습니다.
서버는 총 9대로 producer 3대, broker 3대, consumer 3대로 구성하겠습니다.
각각의 서버의 Hostname, IP는 아래와 같습니다.
kafka-producer-1 : 172.17.1.239 kafka-producer-2 : 172.17.1.240 kafka-producer-3 : 172.17.1.241 kafka-broker-1 : 172.17.1.242 kafka-broker-2 : 172.17.1.243 kafka-broker-3 : 172.17.1.244 kafka-consumer-1 : 172.17.1.245 kafka-consumer-2 : 172.17.1.246 kafka-consumer-3 : 172.17.1.247
3.1. 공통 설정
– Hostname 별칭 설정
producer와 consumer에서 이 부분이 설정 되어있지 않으면, https://gist.github.com/ruo91/980719a9f7e1f7ef72c5 와 같은 에러를 만나 볼수 있으며, 특히 topic을 생성시에 무한대의 Exception 에러를 만나 보실수 있습니다. ㅋㅋ
root@ruo91:~# echo '172.17.1.239 kafka-producer-1' >> /etc/hosts root@ruo91:~# echo '172.17.1.240 kafka-producer-2' >> /etc/hosts root@ruo91:~# echo '172.17.1.241 kafka-producer-3' >> /etc/hosts root@ruo91:~# echo '172.17.1.242 kafka-broker-1' >> /etc/hosts root@ruo91:~# echo '172.17.1.243 kafka-broker-2' >> /etc/hosts root@ruo91:~# echo '172.17.1.244 kafka-broker-3' >> /etc/hosts root@ruo91:~# echo '172.17.1.245 kafka-consumer-1' >> /etc/hosts root@ruo91:~# echo '172.17.1.246 kafka-consumer-2' >> /etc/hosts root@ruo91:~# echo '172.17.1.247 kafka-consumer-3' >> /etc/hosts
– JDK 설치
root@ruo91:~# curl -LO "http://download.oracle.com/otn-pub/java/jdk/8u11-b12/jdk-8u11-linux-x64.tar.gz" \ -H 'Cookie: oraclelicense=accept-securebackup-cookie' root@ruo91:~# tar xzf jdk-8u11-linux-x64.tar.gz root@ruo91:~# mv jdk1.8.0_11 /usr/local/jdk root@ruo91:~# rm -f jdk-8u11-linux-x64.tar.gz root@ruo91:~# echo '# JDK' >> /etc/profile root@ruo91:~# echo 'export JAVA_HOME=/usr/local/jdk" >> /etc/profile root@ruo91:~# echo 'export PATH="$PATH:$JAVA_HOME/bin"' >> /etc/profile root@ruo91:~# echo '' >> /etc/profile
– Kafka 설치
Kafka 0.8.1.1 버전을 사용하겠습니다.
root@ruo91:~# curl -LO "http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz" \ root@ruo91:~# tar xzf kafka_2.10-0.8.1.1.tgz root@ruo91:~# mv kafka_2.10-0.8.1.1.tgz /opt/kafka root@ruo91:~# rm -rf kafka_2.10-0.8.1.1.tgz root@ruo91:~# echo '# Apache Kafka' >> /etc/profile root@ruo91:~# echo "export KAFKA_HOME=/opt/kafka" >> /etc/profile root@ruo91:~# echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile
3.2. Broker
Broker에서는 config/zookeeper.properties, config/server.properties 설정 파일을 수정 합니다.
– ZooKeeper 설정
기존에 kafka에서 포함 되어 있는 config/zookeeper.properties 설정 파일에는 tickTime, initLimit, syncLimit 옵션들이 빠져 있습니다.
이 옵션들을 추가하고, 동일한 standalone 형태의 ZooKeeper들을 복제 모드로 설정 합니다.
root@kafka-broker:~# echo 'tickTime=2000' >> /opt/kafka/config/zookeeper.properties root@kafka-broker:~# echo 'initLimit=10' >> /opt/kafka/config/zookeeper.properties root@kafka-broker:~# echo 'syncLimit=5' >> /opt/kafka/config/zookeeper.properties
데이터 디렉토리를 변경하고, 복제 모드로 설정을 합니다.
root@kafka-broker:~# sed -i '/^dataDir/ s:.*:dataDir=/opt/zk-data:' /opt/kafka/config/zookeeper.properties root@kafka-broker:~# echo 'server.1=kafka-broker-1:2888:3888' >> /opt/kafka/config/zookeeper.properties root@kafka-broker:~# echo 'server.2=kafka-broker-2:2888:3888' >> /opt/kafka/config/zookeeper.properties root@kafka-broker:~# echo 'server.3=kafka-broker-3:2888:3888' >> /opt/kafka/config/zookeeper.properties
ZooKeeper 서버들에게 myid 번호를 부여 합니다.
kafka-broker-1의 경우 myid는 1, kafka-broker-2의 경우 myid는 2, kafka-broker-3의 경우 myid는 3으로 설정 합니다.
root@kafka-broker:~# mkdir /opt/zk-data root@kafka-broker:~# echo '1' > /opt/zk-data/myid
– Broker 설정
config/server.properties의 broker.id를 설정합니다.
kafka-broker-1에는 1, kafka-broker-2에는 2, kafka-broker-3에는 3으로 변경 합니다.
root@kafka-broker:~# broker.id=1 root@kafka-broker:~# sed -i '/^broker.id/ s:.*:broker.id=1:' /opt/kafka/config/server.properties
Broker의 partition 및 서버 로그가 생성 될 디렉토리를 변경 합니다.
root@kafka-broker:~# sed -i '/^log.dirs/ s:.*:log.dirs=/opt/kafka/logs:' /opt/kafka/config/server.properties
ZooKeeper 서버들을 추가 합니다.
root@kafka-broker:~# sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect=kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181/g' /opt/kafka/config/server.properties
– ZooKeeper 실행
root@kafka-broker:~# zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
– Broker 실행
Broker 서버를 실행과 동시에 kafka-broker-1 서버가 리더로 선출된 것을 확인 할수 있고, 나머지 kafka-broker-2, 3은 Follower로써 역할을 수행하게 됩니다.
root@kafka-broker:~# kafka-server-start.sh /opt/kafka/config/server.properties [2014-08-08 23:35:35,701] INFO Verifying properties (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,723] INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,723] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,723] INFO Property log.dirs is overridden to /opt/kafka/logs (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,724] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,724] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,724] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,725] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,725] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,727] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,727] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,727] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,728] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,728] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,728] INFO Property zookeeper.connect is overridden to kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,728] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties) [2014-08-08 23:35:35,739] INFO [Kafka Server 1], starting (kafka.server.KafkaServer) [2014-08-08 23:35:35,740] INFO [Kafka Server 1], Connecting to zookeeper on kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181 (kafka.server.KafkaServer) [2014-08-08 23:35:35,956] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager) [2014-08-08 23:35:35,959] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2014-08-08 23:35:35,987] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2014-08-08 23:35:35,988] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer) [2014-08-08 23:35:36,037] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2014-08-08 23:35:36,070] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2014-08-08 23:35:36,185] INFO Registered broker 1 at path /brokers/ids/1 with address kafka-broker-1:9092. (kafka.utils.ZkUtils$) [2014-08-08 23:35:36,196] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
3.3. Producer
config/producer.properties 설정 파일의 metadata.broker.list에 Broker 서버들을 추가 합니다.
root@kafka-producer:~# sed -i 's/metadata.broker.list=localhost:9092/metadata.broker.list=kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092/g' /opt/kafka/config/producer.properties
– Topic 생성
producer-1, 2, 3의 topic을 생성 합니다.
root@kafka-producer:~# kafka-topics.sh \ --create \ --partitions 4 \ --topic producer-1 \ --replication-factor 1 \ --zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181 Created topic "producer-1".
root@kafka-producer:~# kafka-topics.sh \ --create \ --partitions 4 \ --topic producer-2 \ --replication-factor 1 \ --zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181 Created topic "producer-2".
root@kafka-producer:~# kafka-topics.sh \ --create \ --partitions 4 \ --topic producer-3 \ --replication-factor 1 \ --zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181 Created topic "producer-3".
– Topic 확인
root@kafka-producer:~# kafka-topics.sh \ --list \ --zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181 producer-1 producer-2 producer-3
Topic, Partition, Replication Factor 정보 확인
root@kafka-producer:~# kafka-topics.sh \ --describe \ --topic producer-1 \ --zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181 Topic: producer-1 PartitionCount:4 ReplicationFactor:1 Configs: Topic: producer-1 Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic: producer-1 Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: producer-1 Partition: 2 Leader: 2 Replicas: 2 Isr: 2 Topic: producer-1 Partition: 3 Leader: 3 Replicas: 3 Isr: 3
– Producer 실행
root@kafka-producer:~# kafka-console-producer.sh \ --sync \ --topic producer-1 \ --broker-list kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092
3.4. Consumer
config/consumer.properties 설정 파일의 zookeeper.connect에 ZooKeeper 서버들을 추가 합니다.
root@kafka-consumer:~# sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect=kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181/g' /opt/kafka/config/server.properties
– Consumer 실행
구독(Subscribe)할 Topic을 지정하여 실행 합니다.
root@kafka-consumer:~# kafka-console-consumer.sh \ --from-beginning \ --topic producer-1 \ --zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181
4. 테스트
위 구성을 통해 실제로 producer 서버에서 메세지를 발생시켜, consumer가 메세지를 받을수 있는지 확인 해보겠습니다.
– Producer
kafka-producer-1의 topic은 producer-1, kafka-producer-2의 topic은 producer-2, kafka-producer-3의 topic은 producer-3으로 지정하여 실행 하고,
메세지를 발생 시킵니다.
Figure 2. Apache Kafka Producer
– Consumer
Producer와 마찬가지로 각각의 topic을 지정하여 실행하여, Broker로 부터 메세지를 받아 오는지 확인 합니다.
Figure 3. Apache Kafka Consumer
5. 참고
http://kafka.apache.org/documentation.html
http://www.infoq.com/articles/apache-kafka
http://www.slideshare.net/search/slideshow?searchfrom=header&q=apache+kafka
http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/