Yongbok Blog

Apache Kafka – A high-throughput distributed messaging system

1. Apache Kafka?

LinkedIn에서 자사의 내부 데이터 처리를 위해 개발한 대용량 분산 메세징 시스템 입니다.

Kafka는 확장이 용이하고, 높은 처리량과 분산처리를 할수 있으며, In-Memory에 비해 뒤지지 않는 성능과 비휘발성으로 데이터를 디스크 저장 한다는 장점이 있습니다.

Kafka를 활용하는 대표적인 기업으로는 LinkedIn, Twitter, Netflix, Tumblr, Foursquare가 사용하는 것으로 알고 있으며, 그 외에도 대용량 메세징을 다루는 기업에서 활용 중인걸로 알고 있습니다.

처리할 로그와 메세지들은 넘쳐나고, 안전을 생각하자니 In-Memory는 안될 것 같고, 디스크로 처리 하자니 성능이 발목을 잡고.. 이런 걱정 하시는 분들에게 좋은 솔루션인듯 합니다.

 

2. Kafka Architecture

Kafka는 Producer, Broker, Consumer로 나누어져 있고, 시스템에서 발생 되는 메세지를 topic으로 분류 하고 있습니다.

이해를 돕기 위해 아래 그림을 통해 원리를 알아보도록 하겠습니다.

1. 시스템에서 발생 되는 메세지를 Producer가 topic으로 분류하여 Broker에게 던져 줍니다.
2. Broker는 topic을 통해 흘러들어온 메세지들을 각각의 Partition에 저장 합니다.
저장 이후 각각의 Follower들은 장애 발생을 대비하여, 무작위로 자신의 Partition에 Leader의 topic을 복제 합니다.
3. Consumer가 원하는 topic을 선정하여, Broker로 부터 데이터를 가져 옵니다.

Figure 1. Kafka Architecture

3. 실제 구성
위의 Architecture를 토대로 실제로 구성 해보면서 이해를 해보도록 하겠습니다.

서버는 총 9대로 producer 3대, broker 3대, consumer 3대로 구성하겠습니다.

각각의 서버의 Hostname, IP는 아래와 같습니다.

kafka-producer-1 : 172.17.1.239
kafka-producer-2 : 172.17.1.240
kafka-producer-3 : 172.17.1.241
kafka-broker-1   : 172.17.1.242
kafka-broker-2   : 172.17.1.243
kafka-broker-3   : 172.17.1.244
kafka-consumer-1 : 172.17.1.245
kafka-consumer-2 : 172.17.1.246
kafka-consumer-3 : 172.17.1.247

 

3.1. 공통 설정

– Hostname 별칭 설정
producer와 consumer에서 이 부분이 설정 되어있지 않으면, https://gist.github.com/ruo91/980719a9f7e1f7ef72c5 와 같은 에러를 만나 볼수 있으며, 특히 topic을 생성시에 무한대의 Exception 에러를 만나 보실수 있습니다. ㅋㅋ

root@ruo91:~# echo '172.17.1.239 kafka-producer-1' >> /etc/hosts
root@ruo91:~# echo '172.17.1.240 kafka-producer-2' >> /etc/hosts
root@ruo91:~# echo '172.17.1.241 kafka-producer-3' >> /etc/hosts
root@ruo91:~# echo '172.17.1.242 kafka-broker-1' >> /etc/hosts
root@ruo91:~# echo '172.17.1.243 kafka-broker-2' >> /etc/hosts
root@ruo91:~# echo '172.17.1.244 kafka-broker-3' >> /etc/hosts
root@ruo91:~# echo '172.17.1.245 kafka-consumer-1' >> /etc/hosts
root@ruo91:~# echo '172.17.1.246 kafka-consumer-2' >> /etc/hosts
root@ruo91:~# echo '172.17.1.247 kafka-consumer-3' >> /etc/hosts

 

– JDK 설치

root@ruo91:~# curl -LO "http://download.oracle.com/otn-pub/java/jdk/8u11-b12/jdk-8u11-linux-x64.tar.gz" \
-H 'Cookie: oraclelicense=accept-securebackup-cookie'
root@ruo91:~# tar xzf jdk-8u11-linux-x64.tar.gz
root@ruo91:~# mv jdk1.8.0_11 /usr/local/jdk
root@ruo91:~# rm -f jdk-8u11-linux-x64.tar.gz
root@ruo91:~# echo '# JDK' >> /etc/profile
root@ruo91:~# echo 'export JAVA_HOME=/usr/local/jdk" >> /etc/profile
root@ruo91:~# echo 'export PATH="$PATH:$JAVA_HOME/bin"' >> /etc/profile
root@ruo91:~# echo '' >> /etc/profile

 

– Kafka 설치
Kafka 0.8.1.1 버전을 사용하겠습니다.

root@ruo91:~# curl -LO "http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz" \
root@ruo91:~# tar xzf kafka_2.10-0.8.1.1.tgz
root@ruo91:~# mv kafka_2.10-0.8.1.1.tgz /opt/kafka
root@ruo91:~# rm -rf kafka_2.10-0.8.1.1.tgz
root@ruo91:~# echo '# Apache Kafka' >> /etc/profile
root@ruo91:~# echo "export KAFKA_HOME=/opt/kafka" >> /etc/profile
root@ruo91:~# echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile

 

3.2. Broker
Broker에서는 config/zookeeper.properties, config/server.properties 설정 파일을 수정 합니다.

– ZooKeeper 설정
기존에 kafka에서 포함 되어 있는 config/zookeeper.properties 설정 파일에는 tickTime, initLimit, syncLimit 옵션들이 빠져 있습니다.
이 옵션들을 추가하고, 동일한 standalone 형태의 ZooKeeper들을 복제 모드로 설정 합니다.

root@kafka-broker:~# echo 'tickTime=2000' >> /opt/kafka/config/zookeeper.properties
root@kafka-broker:~# echo 'initLimit=10' >> /opt/kafka/config/zookeeper.properties
root@kafka-broker:~# echo 'syncLimit=5' >> /opt/kafka/config/zookeeper.properties

 

데이터 디렉토리를 변경하고, 복제 모드로 설정을 합니다.

root@kafka-broker:~# sed -i '/^dataDir/ s:.*:dataDir=/opt/zk-data:' /opt/kafka/config/zookeeper.properties
root@kafka-broker:~# echo 'server.1=kafka-broker-1:2888:3888' >> /opt/kafka/config/zookeeper.properties
root@kafka-broker:~# echo 'server.2=kafka-broker-2:2888:3888' >> /opt/kafka/config/zookeeper.properties
root@kafka-broker:~# echo 'server.3=kafka-broker-3:2888:3888' >> /opt/kafka/config/zookeeper.properties

 

ZooKeeper 서버들에게 myid 번호를 부여 합니다.
kafka-broker-1의 경우 myid는 1, kafka-broker-2의 경우 myid는 2, kafka-broker-3의 경우 myid는 3으로 설정 합니다.

root@kafka-broker:~# mkdir /opt/zk-data
root@kafka-broker:~# echo '1' > /opt/zk-data/myid

 

– Broker 설정
config/server.properties의 broker.id를 설정합니다.
kafka-broker-1에는 1, kafka-broker-2에는 2, kafka-broker-3에는 3으로 변경 합니다.

root@kafka-broker:~# broker.id=1
root@kafka-broker:~# sed -i '/^broker.id/ s:.*:broker.id=1:' /opt/kafka/config/server.properties

 

Broker의 partition 및 서버 로그가 생성 될 디렉토리를 변경 합니다.

root@kafka-broker:~# sed -i '/^log.dirs/ s:.*:log.dirs=/opt/kafka/logs:' /opt/kafka/config/server.properties

 

ZooKeeper 서버들을 추가 합니다.

root@kafka-broker:~# sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect=kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181/g' /opt/kafka/config/server.properties

 

– ZooKeeper 실행

root@kafka-broker:~# zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties

 

– Broker 실행
Broker 서버를 실행과 동시에 kafka-broker-1 서버가 리더로 선출된 것을 확인 할수 있고, 나머지 kafka-broker-2, 3은 Follower로써 역할을 수행하게 됩니다.

root@kafka-broker:~# kafka-server-start.sh /opt/kafka/config/server.properties
[2014-08-08 23:35:35,701] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,723] INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,723] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,723] INFO Property log.dirs is overridden to /opt/kafka/logs (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,724] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,724] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,724] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,725] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,725] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,727] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,727] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,727] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,728] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,728] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,728] INFO Property zookeeper.connect is overridden to kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,728] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
[2014-08-08 23:35:35,739] INFO [Kafka Server 1], starting (kafka.server.KafkaServer)
[2014-08-08 23:35:35,740] INFO [Kafka Server 1], Connecting to zookeeper on kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181 (kafka.server.KafkaServer)
[2014-08-08 23:35:35,956] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
[2014-08-08 23:35:35,959] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2014-08-08 23:35:35,987] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2014-08-08 23:35:35,988] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer)
[2014-08-08 23:35:36,037] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-08-08 23:35:36,070] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-08-08 23:35:36,185] INFO Registered broker 1 at path /brokers/ids/1 with address kafka-broker-1:9092. (kafka.utils.ZkUtils$)
[2014-08-08 23:35:36,196] INFO [Kafka Server 1], started (kafka.server.KafkaServer)

 

3.3. Producer
config/producer.properties 설정 파일의 metadata.broker.list에 Broker 서버들을 추가 합니다.

root@kafka-producer:~# sed -i 's/metadata.broker.list=localhost:9092/metadata.broker.list=kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092/g' /opt/kafka/config/producer.properties

 

– Topic 생성
producer-1, 2, 3의 topic을 생성 합니다.

root@kafka-producer:~# kafka-topics.sh \
--create \
--partitions 4 \
--topic producer-1 \
--replication-factor 1 \
--zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181
Created topic "producer-1".
root@kafka-producer:~# kafka-topics.sh \
--create \
--partitions 4 \
--topic producer-2 \
--replication-factor 1 \
--zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181
Created topic "producer-2".
root@kafka-producer:~# kafka-topics.sh \
--create \
--partitions 4 \
--topic producer-3 \
--replication-factor 1 \
--zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181
Created topic "producer-3".

 

– Topic 확인

root@kafka-producer:~# kafka-topics.sh \
--list \
--zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181
producer-1
producer-2
producer-3

 

Topic, Partition, Replication Factor 정보 확인

root@kafka-producer:~# kafka-topics.sh \
--describe \
--topic producer-1 \
--zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181
Topic: producer-1 PartitionCount:4 ReplicationFactor:1 Configs:
Topic: producer-1 Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Topic: producer-1 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: producer-1 Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: producer-1 Partition: 3 Leader: 3 Replicas: 3 Isr: 3

 

– Producer 실행

root@kafka-producer:~# kafka-console-producer.sh \
--sync \
--topic producer-1 \
--broker-list kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092

 

3.4. Consumer
config/consumer.properties 설정 파일의 zookeeper.connect에 ZooKeeper 서버들을 추가 합니다.

root@kafka-consumer:~# sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect=kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181/g' /opt/kafka/config/server.properties

 

– Consumer 실행
구독(Subscribe)할 Topic을 지정하여 실행 합니다.

root@kafka-consumer:~# kafka-console-consumer.sh \
--from-beginning \
--topic producer-1 \
--zookeeper kafka-broker-1:2181,kafka-broker-2:2181,kafka-broker-3:2181

 

4. 테스트
위 구성을 통해 실제로 producer 서버에서 메세지를 발생시켜, consumer가 메세지를 받을수 있는지 확인 해보겠습니다.

– Producer
kafka-producer-1의 topic은 producer-1, kafka-producer-2의 topic은 producer-2, kafka-producer-3의 topic은 producer-3으로 지정하여 실행 하고,
메세지를 발생 시킵니다.

Figure 2. Apache Kafka Producer

– Consumer
Producer와 마찬가지로 각각의 topic을 지정하여 실행하여, Broker로 부터 메세지를 받아 오는지 확인 합니다.

Figure 3. Apache Kafka Consumer

5. 참고
http://kafka.apache.org/documentation.html
http://www.infoq.com/articles/apache-kafka
http://www.slideshare.net/search/slideshow?searchfrom=header&q=apache+kafka
http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/

Exit mobile version