一、Flume概述 Flume是一个分布式的数据收集系统, 具有高可靠、 高可用、 事务管理、 失 败重启等功能。 数据处理速度快, 完全可以用于生产环境。 Flume的核心是agent。 agent是一个java进程, 运行在日志收集端, 通过agent 接收日志, 然后暂存起来, 再发送到目的地。
1、Flume是一个海量日志收集系统,由Cloudera公司推出。
2、Flume中,事务(Event)是最小的数据单位,通常代表一行记录(一条记 录)。Event由source从外部数据源获取,传给channel暂存,然后传 给sink,通过sink传到目的地(HDFS)中。 3、Flume中,Agent是运行的核心,它包括3个组件: 1)source,可以从外部数据源中获取数据。例如,它可以对某一 目录(文件夹)进行监控,如果目录中有新的文件产生,source会 自动读取新文件的内容,然后将内容传递给channel进行暂存。 2)channel,可以接收source传递的数据,并将数据进行暂存, 然后将数据传递给sink。直到sink将数据成功传到目的地之后, channel才将暂存的数据删除。 channel是source与sink之间连接的桥梁,source与sink之间不能 直接发生连接,只能通过channel进行连接。 3)sink,可以接收channel传递的数据,并将数据传到目的地中。
二、Flume核心概念• 核心概念: agent、 source、 channel、 sink、 interceptor• source可以接收各种格式的数据, 如console、 thrift、 exec、 httpdent• sink可以发送到各种不同的目的地, 如hdfs、 hbase、 文件、 控制台等• channel缓冲数据, 可以放在内存、 文件、 数据库等• interceptor可以在数据传递过程中改变其属性信息
三、安装步骤:
1、解压缩:tar -zxvf apache-flume-1.6.0-bin.tar.gz
重命名:mv ******* flume配置相关的环境变量:
vi .bash_profile
export FLUME_HOME=/home/hadoop/apps/flumeexport FLUME_CONF_DIR=$FLUME_HOME/confexport PATH=$PATH:$FLUME_HOME/bin 2、vi flume-conf.properties #agent1表示代理名称agent1.sources=source1agent1.sinks=sink1agent1.channels=channel1 • #配置source1• agent1.sources.source1.type=spooldir• agent1.sources.source1.spoolDir=/home/hadoop/flumeinfo• agent1.sources.source1.channels=channel1• agent1.sources.source1.fileHeader = false• agent1.sources.source1.interceptors = i1• agent1.sources.source1.interceptors.i1.type = timestamp• #配置channel1• agent1.channels.channel1.type=file• agent1.channels.channel1.checkpointDir=/home/hadoop/flume_tmp_cp• agent1.channels.channel1.dataDirs=/home/hadoop/flume_tmp • #配置sink1• agent1.sinks.sink1.type=hdfs• agent1.sinks.sink1.hdfs.path=hdfs://liuwei1:9000/logs(如果高可就集群则hdfs://myha01/logs)• agent1.sinks.sink1.hdfs.fileType=DataStream• agent1.sinks.sink1.hdfs.writeFormat=TEXT• agent1.sinks.sink1.hdfs.rollInterval=1• agent1.sinks.sink1.channel=channel1• agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d
3、创建本地文件夹: mkdir /home/hadoop/flumeinfo mkdir /home/hadoop/flume_tmp_cp mkdir /home/hadoop/flume_tmp 创建HDFS文件夹: hdfs dfs -mkdir /logs 4、启动脚本进入bin目录下 执行 ./flume-ng agent -n agent1 -c conf -f /home/hadoop/apps/flume/conf/flume-conf.properties -Dflume.root.logger=DEBUG,console
新建一份文件, 移动到/home/hadoop/flumeinfo目录下, flume就会自动上传到HDFS的/logs目录中
四、Flume+Spark Streaming
vi flume-conf.properties1
修改 配置sink1 (其他不用修改)
agent1.sinks.sink1.type = avro
agent1.sinks.sink1.channel = channel1agent1.sinks.sink1.hostname =0.0.0.0agent1.sinks.sink1.port = 41414再启动 ./flume-ng agent -n agent1 -c conf -f /home/hadoop/apps/flume/conf/flume-conf.properties1 -Dflume.root.logger=DEBUG,console (启动相关flume-conf.properties1)
object produce { def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("produce").setMaster("local[2]") val ssc=new StreamingContext(conf,Seconds(5)) val linesDStream=FlumeUtils.createStream(ssc,"0.0.0.0",41414) val wordsDStream=linesDStream.flatMap(line=>{ val inf=new String(line.event.getBody.array()) inf.split(" ") }) val resultDStream=wordsDStream.map((_,1)).reduceByKey(_+_) resultDStream.print() ssc.start() ssc.awaitTermination() }