無業可守 創新圖強
living innovation

歐億測速娛樂

當前位置:首頁 > 歐億測速娛樂 >

使用Flume消費Kafka數據到HDFS

日期:2019-08-18

1.概述

對于數據的轉發,Kafka是一個不錯的選擇。Kafka能夠裝載數據到消息隊列,然后等待其他業務場景去消費這些數據,Kafka的應用接口API非常的豐富,支持各種存儲介質,例如HDFS、HBase等。如果不想使用Kafka API編寫代碼去消費Kafka Topic,也是有組件可以去集成消費的。下面筆者將為大家介紹如何使用Flume快速消費Kafka Topic數據,然后將消費后的數據轉發到HDFS上。

2.內容

在實現這套方案之間,可以先來看看整個數據的流向,如下圖所示:

?

業務數據實時存儲到Kafka集群,然后通過Flume Source組件實時去消費Kafka業務Topic獲取數據,將消費后的數據通過Flume Sink組件發送到HDFS進行存儲。

2.1 準備基礎環境

按照上圖所示數據流向方案,需要準備好Kafka、Flume、Hadoop(HDFS可用)等組件。

2.1.1 啟動Kafka集群并創建Topic

Kafka目前來說,并沒有一個批量的管理腳本,不過我們可以對kafka-server-start.sh腳本和kafka-server-stop.sh腳本進行二次封裝。代碼如下所示:

#! /bin/bash# Kafka代理節點地址, 如果節點較多可以用一個文件來存儲hosts=(dn1 dn2 dn3)# 打印啟動分布式腳本信息mill=`date "+%N"`tdate=`date "+%Y-%m-%d %H:%M:%S,${mill:0:3}"`echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation.# 執行分布式開啟命令 function start(){ for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh [email protected]$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" & sleep 1 done} # 執行分布式關閉命令 function stop(){ for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh [email protected]$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" & sleep 1 done}# 查看Kafka代理節點狀態function status(){ for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh [email protected]$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] status message is :;jps | grep Kafka;" & sleep 1 done}# 判斷輸入的Kafka命令參數是否有效case "$1" in start) start ;; stop) stop ;; status) status ;; *) echo "Usage: $0 {start|stop|status}" RETVAL=1esac

啟動Kafka集群后,在Kafka集群可用的情況下,創建一個業務Topic,執行命令如下:

# 創建一個flume_collector_data主題kafka-topics.sh --create --zookeeper dn1:2181,dn2:2181,dn3:2181 --replication-factor 3 --partitions 6 --topic flume_collector_data

2.2 配置Flume Agent

然后,開始配置Flume Agent信息,讓Flume從Kafka集群的flume_collector_data主題中讀取數據,并將讀取到的數據發送到HDFS中進行存儲。配置內容如下:

# ------------------- define data source ----------------------# source aliasagent.sources = source_from_kafka # channels aliasagent.channels = mem_channel # sink aliasagent.sinks = hdfs_sink # define kafka sourceagent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource agent.sources.source_from_kafka.channels = mem_channel agent.sources.source_from_kafka.batchSize = 5000 # set kafka broker address agent.sources.source_from_kafka.kafka.bootstrap.servers = dn1:9092,dn2:9092,dn3:9092# set kafka topicagent.sources.source_from_kafka.kafka.topics = flume_collector_data# set kafka groupidagent.sources.source_from_kafka.kafka.consumer.group.id = flume_test_id# defind hdfs sinkagent.sinks.hdfs_sink.type = hdfs # specify the channel the sink should use agent.sinks.hdfs_sink.channel = mem_channel# set store hdfs pathagent.sinks.hdfs_sink.hdfs.path = /data/flume/kafka/%Y%m%d # set file size to trigger rollagent.sinks.hdfs_sink.hdfs.rollSize = 0 agent.sinks.hdfs_sink.hdfs.rollCount = 0 agent.sinks.hdfs_sink.hdfs.rollInterval = 3600 agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30agent.sinks.hdfs_sink.hdfs.fileType=DataStream agent.sinks.hdfs_sink.hdfs.writeFormat=Text # define channel from kafka source to hdfs sink agent.channels.mem_channel.type = memory # channel store sizeagent.channels.mem_channel.capacity = 100000# transaction sizeagent.channels.mem_channel.transactionCapacity = 10000

然后,啟動Flume Agent,執行命令如下:

# 在Linux后臺執行命令flume-ng agent -n agent -f $FLUME_HOME/conf/kafka2hdfs.properties &

2.3 向Kafka主題中發送數據

啟動Kafka Eagle監控系統(執行ke.sh start命令),填寫發送數據。如下圖所示:

然后,查詢Topic中的數據是否有被寫入,如下圖所示:

?

最后,到HDFS對應的路徑查看Flume傳輸的數據,結果如下圖所示:

3.Kafka如何通過Flume傳輸數據到HBase

3.1 創建新主題

創建一個新的Topic,執行命令如下:

# 創建一個flume_kafka_to_hbase主題kafka-topics.sh --create --zookeeper dn1:2181,dn2:2181,dn3:2181 --replication-factor 3 --partitions 6 --topic flume_kafka_to_hbase

3.2 配置Flume Agent

然后,配置Flume Agent信息,內容如下:

# ------------------- define data source ----------------------# source aliasagent.sources = kafkaSource# channels aliasagent.channels = kafkaChannel# sink aliasagent.sinks = hbaseSink# set kafka channelagent.sources.kafkaSource.channels = kafkaChannel# set hbase channelagent.sinks.hbaseSink.channel = kafkaChannel# set kafka sourceagent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource# set kafka broker address agent.sources.kafkaSource.kafka.bootstrap.servers = dn1:9092,dn2:9092,dn3:9092# set kafka topicagent.sources.kafkaSource.kafka.topics = flume_kafka_to_hbase# set kafka groupidagent.sources.kafkaSource.kafka.consumer.group.id = flume_test_id# set channelagent.channels.kafkaChannel.type = org.aprache.flume.channel.kafka.KafkaChannel# channel queueagent.channels.kafkaChannel.capacity=10000# transaction sizeagent.channels.kafkaChannel.transactionCapacity=1000# set hbase sinkagent.sinks.hbaseSink.type = asynchbase# hbase tableagent.sinks.hbaseSink.table = flume_data# set table columnagent.sinks.hbaseSink.columnFamily= info# serializer sinkagent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer# set hbase zkagent.sinks.hbaseSink.zookeeperQuorum = dn1:2181,dn2:2181,dn3:2181

3.3 創建HBase表

進入到HBase集群,執行表創建命令,如下所示:

hbase(main):002:0> create "flume_data","info"

3.4 啟動Flume Agent

接著,啟動Flume Agent實例,命令如下所示:

# 在Linux后臺執行命令flume-ng agent -n agent -f $FLUME_HOME/conf/kafka2hbase.properties &

3.5 在Kafka Eagle中向Topic寫入數據

然后,在Kafka Eagle中寫入數據,如下圖所示:

3.6 在HBase中查詢傳輸的數據

最后,在HBase中查詢表flume_data的數據,驗證是否傳輸成功,命令如下:

hbase(main):003:0> scan "flume_data"

預覽結果如下所示:

?

4.總結

?至此,Kafka中業務Topic的數據,經過Flume Source組件消費后,再由Flume Sink組件寫入到HDFS,整個過程省略了大量的業務編碼工作。如果實際工作當中不涉及復雜的業務邏輯處理,對于Kafka的數據轉發需求,不妨可以試試這種方案。

5.結束語

這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka并不難學》,喜歡的朋友或同學,?可以在公告欄那里點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。?

新11选5app