Flume1.9.0的安装、部署、简单应用(含分布式、与Hadoop3.1.2、Hbase1.4.9的案例)

Flume1.9.0的安装、部署、简单应用(含分布式、与Hadoop3.1.2、Hbase1.4.9的案例)

目录

[toc]

前言

什么是Flume?

  Apache Flume是一个分布式,可靠且可用的系统,用于有效地从许多不同的源收集,聚合和移动大量日志数据到集中式数据存储。
  Apache Flume 的使用不仅限于日志数据聚合。由于数据源是可定制的,因此 Flume 可用于传输大量事件数据,包括但不限于网络流量数据,社交媒体生成的数据,电子邮件消息以及几乎任何可能的数据源。

Flume的特点

  Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume 提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFSHbase等)的能力 。
  Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些 EventAgent 外部的 Source 生成,当 Source 捕获事件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个) Channel 中。你可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久化日志或者把事件推向另一个 Source
  

Flume的可靠性

  当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume 的事件是通过 AgentChannel 中进行的。然后将事件传递到流中的下一个 Agent 或终端存储库(如HDFS)。
  只有将事件存储在下一个 AgentChannel 或终端存储库中后,才会从 Channel 中删除这些事件。
  这就是 Flume 中的单跳消息传递语义如何提供流的端到端可靠性。可确保事件在流中从一个点到另一个点可靠地传递。在多条流的情况下,来自前一条的 Sink 和来自下一条的 Source 都运行其事务以确保数据安全地存储在下一个的Channel 中。
  

Flume的可恢复性

  还是靠 Channel。推荐使用 FileChannel ,事件持久化在本地文件系统里(性能较差)。 而内存通道,它只是将事件存储在内存中的队列中,虽然更快,但是当 Agent 进程死亡时仍然留在内存通道中的任何事件都无法恢复。
  

Flume的一些核心概念

  • Agent 使用 JVM 运行 Flume。每台机器运行一个 Agent,但是可以在一个 Agent中包含多个 SourcesSinks
  • Client 生产数据,运行在一个独立的线程。
  • SourceClient 收集数据,传递给 Channel
  • SinkChannel 收集数据,运行在一个独立线程。
  • Channel连接 SourcesSinks ,这个有点像一个队列。Events 可以是日志记录、 avro 对象等。

  FlumeAgent 为最小的独立运行单位。一个 Agent 就是一个 JVM。单 AgentSourceSinkChannel三大组件构成。

  Flume 的每个组件设置属性type,以了解它需要什么类型的对象。每个源,接收器和通道类型都有自己的一组属性,使其能够按预期运行。
  
  值得注意的是,Flume 提供了大量内置的 SourceChannelSink 类型。不同类型的 Source, ChannelSink 可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel 可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink 可以把日志写入HDFS, HBase,甚至是另外一个 Source 等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作。
  flume1.9-01
  

Flume的官方网站在哪里?

  http://flume.apache.org/
  

Flume在哪里下载以及如何安装?

  本文的运行环境,是基于文章<Hadoop 3.1.2(HA)+Zookeeper3.4.13+Hbase1.4.9(HA)+Hive2.3.4+Spark2.4.0(HA)高可用集群搭建>,关于 axel 工具的安装,也请参考文章。

[root@c0 _src]# pwd
/home/work/_src
[root@c0 _src]# axel -n 10 -o /home/work/_src/flume.tar.gz http://mirror.bit.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
[root@c0 _src]# tar -xzvf flume.tar.gz
[root@c0 _src]# mv apache-flume-1.9.0-bin /home/work/_app/

  

设置环境变量

echo "" >> /etc/bashrc
echo "# Flume 1.9.0" >> /etc/bashrc
echo "export FLUME_HOME=/home/work/_app/apache-flume-1.9.0-bin/" >> /etc/bashrc
echo "export PATH=\$PATH:\$FLUME_HOME/bin" >> /etc/bashrc
source /etc/bashrc

  

验证是否安装成功

[root@c0 _src]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9

  

Flume的案例

案例1:Avro

  侦听 Avro 端口并从外部 Avro 客户端流接收事件。Avro 可以发送一个给定的文件给 FlumeAvro 源使用 AVRO RPC 机制。
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/avro.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/avro.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
## 要监听的主机名或IP地址
a1.sources.r1.bind = 0.0.0.0
## 要监听的端口号
a1.sources.r1.port = 4141

# k1 是 sink 的名称,设置 sink的类型
a1.sinks.k1.type = logger

# c1 是 channel 的名称,设置 channel的类型是内存。事件存储在具可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流量,并且在代理发生故障时准备丢失分阶段数据。
a1.channels.c1.type = memory
## Channel 中存储的最大事件数
a1.channels.c1.capacity = 1000
## 每个事件 Channel 从 Source 或提供给 Sink 的最大事件数
a1.channels.c1.transactionCapacity = 100
## 定义byteCapacity与通道中所有事件的估计总大小之间的缓冲区百分比,以计算标头中的数据。
a1.channels.c1.byteCapacityBufferPercentage = 20
## 允许的最大内存总字节数,作为此通道中所有事件的总和。该实现仅计算Event主体,这也是提供byteCapacityBufferPercentage配置参数的原因。默认为计算值,等于JVM可用的最大内存的80%(即命令行传递的-Xmx值的80%)。请注意,如果在单个JVM上有多个内存通道,并且它们碰巧保持相同的物理事件(即,如果您使用来自单个源的复制通道选择器),那么这些事件大小可能会被重复计算以用于通道byteCapacity
a1.channels.c1.byteCapacity = 800000

# 绑定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  
  启动 FlumeAgent 名称是 a1

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 00:49:38,082 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: 0.0.0.0, port: 4141 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@365969a6 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2019-03-11 00:49:38,086 INFO node.Application: Starting Channel c1
2019-03-11 00:49:38,086 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
2019-03-11 00:49:38,134 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2019-03-11 00:49:38,134 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 00:49:38,587 INFO node.Application: Starting Sink k1
2019-03-11 00:49:38,588 INFO node.Application: Starting Source r1
2019-03-11 00:49:38,588 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 4141 }...
2019-03-11 00:49:38,862 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 00:49:38,862 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 00:49:38,867 INFO source.AvroSource: Avro source r1 started.

  
  创建指定文件

[root@c0 ~]# echo "hello mshk.top" > $FLUME_HOME/log.00

  
  使用 avro-client 发送文件

[root@c0 ~]# flume-ng avro-client -c . -H c0 -p 4141 -F $FLUME_HOME/log.00
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/work/_app/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/work/_app/hadoop-3.1.2/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-03-11 00:50:46,621 INFO api.NettyAvroRpcClient: Using default maxIOWorkers

Flume 发行版中包含的 avro-client 可以使用 avro RPC 机制将给定文件发送到 Flume Avro

  
  在 c0 的控制台,可以看到以下信息,注意最后一行:

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 00:49:38,082 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: 0.0.0.0, port: 4141 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@365969a6 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2019-03-11 00:49:38,086 INFO node.Application: Starting Channel c1
2019-03-11 00:49:38,086 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
2019-03-11 00:49:38,134 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2019-03-11 00:49:38,134 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 00:49:38,587 INFO node.Application: Starting Sink k1
2019-03-11 00:49:38,588 INFO node.Application: Starting Source r1
2019-03-11 00:49:38,588 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 4141 }...
2019-03-11 00:49:38,862 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 00:49:38,862 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 00:49:38,867 INFO source.AvroSource: Avro source r1 started.
2019-03-11 00:55:22,708 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 => /10.0.0.100:4141] OPEN
2019-03-11 00:55:22,710 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 => /10.0.0.100:4141] BOUND: /10.0.0.100:4141
2019-03-11 00:55:22,710 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 => /10.0.0.100:4141] CONNECTED: /10.0.0.100:58786
2019-03-11 00:55:22,934 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 :> /10.0.0.100:4141] DISCONNECTED
2019-03-11 00:55:22,934 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 :> /10.0.0.100:4141] UNBOUND
2019-03-11 00:55:22,934 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 :> /10.0.0.100:4141] CLOSED
2019-03-11 00:55:22,934 INFO ipc.NettyServer: Connection to /10.0.0.100:58786 disconnected.
2019-03-11 00:55:26,880 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70       hello mshk.top }

  

案例2:Spool

  Spool 监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:
  1) 拷贝到 Spool 目录下的文件不可以再打开编辑。
  2)Spool 目录下不可包含相应的子目录
  
  与Exec Source不同,即使 Flume 重新启动或被杀死,Spool 也是可靠的并且不会遗漏数据。
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/spool.conf 文件编辑并保存,内容如下

[root@c0 ~]# mkdir -p $FLUME_HOME/logs
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/spool.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
## 读取文件的目录
a1.sources.r1.spoolDir = /home/work/_app/apache-flume-1.9.0-bin/logs
## 是否添加存储绝对路径文件名的标头。
a1.sources.r1.fileHeader = true
## 反序列化器使用的字符集,将输入文件视为文本。
a1.sources.r1.inputCharset = UTF-8

# k1 是 sink 的名称,设置 sink的类型
a1.sinks.k1.type = logger

# c1 是 channel 的名称,设置 channel的类型是内存。事件存储在具可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流量,并且在代理发生故障时准备丢失分阶段数据。
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  
  启动 FlumeAgent 名称是 a1

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 01:58:04,582 INFO node.Application: Starting Sink k1
2019-03-11 01:58:04,587 INFO node.Application: Starting Source r1
2019-03-11 01:58:04,588 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/work/_app/apache-flume-1.9.0-bin/logs
2019-03-11 01:58:04,619 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 01:58:04,619 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

  
  新打开一个窗口,输入命令追加文件到 /home/hadoop/flume-1.5.0-bin/logs 目录

[root@c0 ~]# echo "spool test1" > /home/work/_app/apache-flume-1.9.0-bin/logs/spool_text.log

  
  在 c0 的控制台,可以看到以下相关信息:

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 01:58:04,582 INFO node.Application: Starting Sink k1
2019-03-11 01:58:04,587 INFO node.Application: Starting Source r1
2019-03-11 01:58:04,588 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/work/_app/apache-flume-1.9.0-bin/logs
2019-03-11 01:58:04,619 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 01:58:04,619 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

2019-03-11 02:00:17,055 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2019-03-11 02:00:17,055 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/work/_app/apache-flume-1.9.0-bin/logs/spool_text.log to /home/work/_app/apache-flume-1.9.0-bin/logs/spool_text.log.COMPLETED
2019-03-11 02:00:18,617 INFO sink.LoggerSink: Event: { headers:{file=/home/work/_app/apache-flume-1.9.0-bin/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31                spool test1 }

  

案例3:Exec

  Exec 在启动时运行给定的 Unix 命令,并期望该进程在标准输出上连续生成数据(stderr被简单地丢弃,除非属性logStdErr设置为true)。
  如果进程因任何原因退出,则源也会退出并且不会生成其他数据。这意味着 cat [named pipe]tail -F [file] 等配置将产生所需的结果,而日期可能不会 – 前两个命令产生数据流,而后者产生单个事件并退出。
  下面的实例中,EXEC 执行一个给定的命令获得输出的源,如果要使用 tail 命令,必选使得 file 足够大才能看到输出内容
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/exec_tail.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/exec_tail.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
## 要执行的命令
a1.sources.r1.command = tail -F /home/work/_app/apache-flume-1.9.0-bin/logs/log_exec_tail

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = logger

# c1 是 channel 的名称,设置 channel的类型是内存。事件存储在具可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流量,并且在代理发生故障时准备丢失分阶段数据。
a1.channels.c1.type = memory
## Channel 中存储的最大事件数
a1.channels.c1.capacity = 1000
## 每个事件 Channel 从 Source 或提供给 Sink 的最大事件数
a1.channels.c1.transactionCapacity = 100

# 绑定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  
  启动 FlumeAgent 名称是 a1

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 03:38:47,979 INFO source.ExecSource: Exec source starting with command: tail -F /home/work/_app/apache-flume-1.9.0-bin/logs/log_exec_tail
2019-03-11 03:38:47,980 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 03:38:47,980 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

  
  生成足够多的内容在文件里

[root@c0 ~]# for i in {1..100};do echo "exec tail$i" >> /home/work/_app/apache-flume-1.9.0-bin/logs/log_exec_tail;echo $i;sleep 0.1;done

  
  在 c0 的控制台,可以看到以下信息:

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 03:38:47,979 INFO source.ExecSource: Exec source starting with command: tail -F /home/work/_app/apache-flume-1.9.0-bin/logs/log_exec_tail
2019-03-11 03:38:47,980 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 03:38:47,980 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

2019-03-11 03:48:30,118 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31                   exec tail1 }
2019-03-11 03:48:30,118 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 32                   exec tail2 }
2019-03-11 03:48:30,118 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 33                   exec tail3 }
2019-03-11 03:48:30,119 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 34                   exec tail4 }
...
2019-03-11 03:48:40,135 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 38                exec tail98 }
2019-03-11 03:48:40,135 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 39                exec tail99 }
2019-03-11 03:48:40,135 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31 30 30             exec tail100 }

  

案例4:Syslogtcp

  Syslogtcp 监听 TCP 的端口做为数据源  
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/syslog_tcp.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/syslog_tcp.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = syslogtcp
## 要绑定监听的端口号
a1.sources.r1.port = 5140
## 要绑定的主机名或IP地址
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = logger

# c1 是 channel 的名称,设置 channel的类型是内存。事件存储在具可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流量,并且在代理发生故障时准备丢失分阶段数据。
a1.channels.c1.type = memory
## Channel 中存储的最大事件数
a1.channels.c1.capacity = 1000
## 每个事件 Channel 从 Source 或提供给 Sink 的最大事件数
a1.channels.c1.transactionCapacity = 100

# 绑定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  
  启动 FlumeAgent 名称是 a1

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:45:11,383 INFO source.SyslogTcpSource: Syslog TCP Source starting...
2019-03-11 04:45:11,403 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:45:11,403 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

  
  测试产生syslog

[root@c0 ~]# echo "hello idoall.org syslog" | nc localhost 5140

如果 nc 命令不存在,可以使用 yum install nmap-ncat.x86_64 -y 安装

  
  在 c0 的控制台,可以看到以下信息:

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:45:11,383 INFO source.SyslogTcpSource: Syslog TCP Source starting...
2019-03-11 04:45:11,403 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:45:11,403 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

2019-03-11 04:48:39,679 WARN source.SyslogUtils: Event created from Invalid Syslog data.
2019-03-11 04:48:39,688 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }

  

案例5:JSONHandler

  可以处理以 JSON 格式表示的事件,并支持UTF-8UTF-16UTF-32字符集。
  处理程序接受一个事件数组(即使只有一个事件,事件必须在数组中发送),并根据请求中指定的编码将它们转换为 Flume 事件。如果未指定编码,则假定为 UTF-8
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/post_json.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/post_json.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 8888
a1.sources.r1.channels = c1

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = logger

# c1 是 channel 的名称,设置 channel的类型是内存。事件存储在具可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流量,并且在代理发生故障时准备丢失分阶段数据。
a1.channels.c1.type = memory
## Channel 中存储的最大事件数
a1.channels.c1.capacity = 1000
## 每个事件 Channel 从 Source 或提供给 Sink 的最大事件数
a1.channels.c1.transactionCapacity = 100

# 绑定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  
  启动 FlumeAgent 名称是 a1

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:54:09,997 INFO server.Server: Started @1582ms
2019-03-11 04:54:09,998 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:54:09,998 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

  
  生成 JSON 格式的 POST request

[root@c0 ~]# curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "mshk.top body"}]' http://localhost:8888

  
  在 c0 的控制台,可以看到以下信息:

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:54:09,997 INFO server.Server: Started @1582ms
2019-03-11 04:54:09,998 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:54:09,998 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

2019-03-11 04:55:14,560 INFO sink.LoggerSink: Event: { headers:{a=a1, b=b1} body: 6D 73 68 6B 2E 74 6F 70 5F 62 6F 64 79          mshk.top body }

  

案例6:Hadoop sink

  此接收器将事件写入 Hadoop 分布式文件系统(HDFS)。
  目前支持创建文本和序列文件。支持两种文件类型的压缩。
可以根据经过的时间或数据大小或事件数量定期滚动文件(关闭当前文件并创建新文件)。
  它还根据事件源自的时间戳或机器等属性对数据进行分区/分区。HDFS 目录路径可能包含格式转义序列,将由 HDFS 接收器替换,以生成用于存储事件的目录/文件名。
  以下是支持的转义序列:

Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, …)
%A locale’s full weekday name (Monday, Tuesday, …)
%b locale’s short month name (Jan, Feb, …)
%B locale’s long month name (January, February, …)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%e day of month without padding (1)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%n month without padding (1..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)
%[localhost] Substitute the hostname of the host where the agent is running
%[IP] Substitute the IP address of the host where the agent is running
%[FQDN] Substitute the canonical hostname of the host where the agent is running

转义字符串%[localhost],%[IP]和%[FQDN]都依赖于Java获取主机名的能力,这在某些网络环境中可能会失败。

  
  使用此接收器需要安装 Hadoop,以便 Flume 可以使用 Hadoop jarHDFS 集群进行通信。
  其中关于 Hadoop 部分的安装部署,请参考文章<Hadoop 3.1.2(HA)+Zookeeper3.4.13+Hbase1.4.9(HA)+Hive2.3.4+Spark2.4.0(HA)高可用集群搭建>
 
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/hdfs_sink.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/hdfs_sink.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://c1:8020/flume/syslogtcp
## 是否使用本地时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true
## filePrefix 文件名称前缀
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H
## fileSuffix 文件后缀
a1.sinks.k1.hdfs.fileSuffix = .log
## minBlockReplicas 指定每个HDFS块的最小副本数。如果未指定,则它来自类路径中的默认Hadoop配置。
a1.sinks.k1.hdfs.minBlockReplicas = 1
## DataStream不会压缩输出文件,默认为SequenceFile
a1.sinks.k1.hdfs.fileType = DataStream
## writeFormat 序列文件记录的格式。在使用Flume创建数据文件之前设置为Text,否则 Apache Impala 或Apache Hive无法读取这些文件。
a1.sinks.k1.hdfs.writeFormat = Text
## rollInterval 按照时间、大小、条数将临时文件滚动成最.log文件,值为0时不按照这个规则滚动
a1.sinks.k1.hdfs.rollInterval = 300
## rollSize 触发滚动的文件大小,以字节为单位(0:永不基于文件大小滚动)
a1.sinks.k1.hdfs.rollSize = 0
## rollCount 在滚动之前写入文件的事件数(0 =从不基于事件数滚动)
a1.sinks.k1.hdfs.rollCount = 0
## idleTimeout 超时后非活动文件关闭(0 =禁用自动关闭空闲文件)
a1.sinks.k1.hdfs.idleTimeout = 0
## 将文件刷新到HDFS之前写入文件的事件数
a1.sinks.k1.hdfs.batchSize = 0
## round 是否应将时间戳向下舍入(如果为true,则影响除%t之外的所有基于时间的转义序列)
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 5
a1.sinks.k1.hdfs.roundUnit = minute

# c1 是 channel 的名称,设置 channel的类型是内存。事件存储在具可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流量,并且在代理发生故障时准备丢失分阶段数据。
a1.channels.c1.type = memory
## Channel 中存储的最大事件数
a1.channels.c1.capacity = 1000
## 每个事件 Channel 从 Source 或提供给 Sink 的最大事件数
a1.channels.c1.transactionCapacity = 100

# 绑定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  
  启动 FlumeAgent 名称是 a1

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:59:23,046 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 04:59:23,485 INFO node.Application: Starting Sink k1
2019-03-11 04:59:23,486 INFO node.Application: Starting Source r1
2019-03-11 04:59:23,488 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2019-03-11 04:59:23,488 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
2019-03-11 04:59:23,583 INFO source.SyslogTcpSource: Syslog TCP Source starting...
2019-03-11 04:59:23,601 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:59:23,601 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

  
  测试产生syslog

[root@c0 ~]# echo "hello mshk.top flume hadoop testing one" | nc localhost 5140

  
  在 c0 的控制台,可以看到以下信息:

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:59:23,046 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 04:59:23,485 INFO node.Application: Starting Sink k1
2019-03-11 04:59:23,486 INFO node.Application: Starting Source r1
2019-03-11 04:59:23,488 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2019-03-11 04:59:23,488 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
2019-03-11 04:59:23,583 INFO source.SyslogTcpSource: Syslog TCP Source starting...
2019-03-11 04:59:23,601 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:59:23,601 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

2019-03-11 05:12:08,792 WARN source.SyslogUtils: Event created from Invalid Syslog data.
2019-03-11 05:12:08,792 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
2019-03-11 05:12:08,813 INFO hdfs.BucketWriter: Creating hdfs://c1:8020/flume/syslogtcp/Syslog.1552252328793.tmp
2019-03-11 05:12:38,853 INFO hdfs.HDFSEventSink: Writer callback called.
2019-03-11 05:12:38,853 INFO hdfs.BucketWriter: Closing hdfs://c1:8020/flume/syslogtcp/Syslog.1552252328793.tmp
2019-03-11 05:12:38,885 INFO hdfs.BucketWriter: Renaming hdfs://c1:8020/flume/syslogtcp/Syslog.1552252328793.tmp to hdfs://c1:8020/flume/syslogtcp/Syslog.1552252328793

  
  在 c0 上再打开一个窗口,去 hadoop 上检查文件是否生成

[root@c0 ~]# hadoop fs -ls /flume/syslogtcp
Found 1 items
-rw-r--r--   3 root supergroup        177 2019-03-11 00:32 /flume/syslogtcp/Syslog.1552251858905
[root@c0 ~]# hadoop fs -cat /flume/syslogtcp/Syslog.*
hello mshk.top flume hadoop testing one

  

案例7:File Roll Sink

  在本地文件系统上存储事件
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/file_roll.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/file_roll.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5555
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = file_roll
## 将存储文件的目录
a1.sinks.k1.sink.directory = /home/work/_app/apache-flume-1.9.0-bin/logs

# c1 是 channel 的名称,设置 channel的类型是内存。事件存储在具可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流量,并且在代理发生故障时准备丢失分阶段数据。
a1.channels.c1.type = memory
## Channel 中存储的最大事件数
a1.channels.c1.capacity = 1000
## 每个事件 Channel 从 Source 或提供给 Sink 的最大事件数
a1.channels.c1.transactionCapacity = 100

# 绑定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  
  启动 FlumeAgent 名称是 a1

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 05:18:30,099 INFO source.SyslogTcpSource: Syslog TCP Source starting...
2019-03-11 05:18:30,126 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 05:18:30,127 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

  
  测试产生log

[root@c0 ~]# echo "hello mshk.top syslog" | nc localhost 5555
[root@c0 ~]# echo "hello mshk.top syslog 2" | nc localhost 5555

  
  查看/home/work/_app/apache-flume-1.9.0-bin/logs下是否生成文件,默认每30秒生成一个新文件

[root@c0 ~]# ll /home/work/_app/apache-flume-1.9.0-bin/logs
total 12
-rw-r--r--. 1 root root    0 Mar 11 05:18 1552252709477-1
-rw-r--r--. 1 root root    0 Mar 11 05:19 1552252709477-2
-rw-r--r--. 1 root root    0 Mar 11 05:19 1552252709477-3
-rw-r--r--. 1 root root    0 Mar 11 05:20 1552252709477-4
-rw-r--r--. 1 root root    0 Mar 11 05:20 1552252709477-5
-rw-r--r--. 1 root root    0 Mar 11 05:21 1552252709477-6
-rw-r--r--. 1 root root   46 Mar 11 05:21 1552252709477-7
-rw-r--r--. 1 root root    0 Mar 11 05:22 1552252709477-8
-rw-r--r--. 1 root root    0 Mar 11 05:22 1552252709477-9
-rw-r--r--. 1 root root 1192 Mar 11 03:48 log_exec_tail
-rw-r--r--. 1 root root   12 Mar 11 02:00 spool_text.log.COMPLETED
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/logs/1552252709477*
hello mshk.top syslog
hello mshk.top syslog 2

  

案例8:Replicating Channel Selector

  Flume 支持 Fan out 流从一个源到多个通道。。在复制的情况下,流的事件被发送到所有的配置通道。
  这次我们需要用到 c0c1 两台机器
  在 c0 创建 /home/work/_app/apache-flume-1.9.0-bin/conf/replicating_Channel_Selector.conf 文件编辑并保存,内容如下

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/replicating_Channel_Selector.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c0 c1

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
## 设置 Channel的名称分别是 c0 c1
a1.sources.r1.channels = c0 c1
a1.sources.r1.selector.type = replicating

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c0
a1.sinks.k1.hostname = c0
a1.sinks.k1.port = 5555

# k2 是 sink 的名称,设置 sink 的类型
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = c1
a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

  
  在 c0 创建 /home/work/_app/apache-flume-1.9.0-bin/conf/replicating_Channel_Selector_avro.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/replicating_Channel_Selector_avro.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c0

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = avro
a1.sources.r1.channels = c0
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = logger

# c1 是 channel 的名称,设置 channel的类型是内存。事件存储在具可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流量,并且在代理发生故障时准备丢失分阶段数据。
a1.channels.c0.type = memory
## Channel 中存储的最大事件数
a1.channels.c0.capacity = 1000
## 每个事件 Channel 从 Source 或提供给 Sink 的最大事件数
a1.channels.c0.transactionCapacity = 100

# 绑定 source 和 sink 到 channel
a1.sources.r1.channels = c0
a1.sinks.k1.channel = c0

  
  在 c0 上将2个配置文件复制到 c1 上一份

[root@c0 ~]# scp -r /home/work/_app/apache-flume-1.9.0-bin/conf/replicating_Channel_Selector* c1:/home/work/_app/apache-flume-1.9.0-bin/conf/
replicating_Channel_Selector_avro.conf  100%  485   832.0KB/s   00:00
replicating_Channel_Selector.conf       100%  723     1.5MB/s   00:00

  
  打开4个窗口,在 c0c1 上同时启动两个 FlumeAgent

# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 05:34:22,172 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 05:34:22,175 INFO source.AvroSource: Avro source r1 started.

# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:03:02,811 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 01:03:02,814 INFO source.AvroSource: Avro source r1 started.

# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 05:36:41,054 INFO sink.AbstractRpcSink: Rpc sink k1 started.
2019-03-11 05:36:41,056 INFO sink.AbstractRpcSink: Rpc sink k2 started.

# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:06:04,019 INFO sink.AbstractRpcSink: Rpc sink k1 started.
2019-03-11 01:06:04,019 INFO sink.AbstractRpcSink: Rpc sink k2 started.

  
  然后在 c0c1 分别测试产生syslog

[root@c1 ~]# echo "hello mshk.top" | nc localhost 5140
[root@c0 ~]# echo "hello mshk.top1" | nc localhost 5140

  
  在 c0c1 的sink窗口,可以看到以下信息,这说明信息得到了复用:

# c0
[root@c0 ~]#  flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 11:47:45,208 INFO ipc.NettyServer: [id: 0xcdc38369, /10.0.0.101:54420 => /10.0.0.100:5555] BOUND: /10.0.0.100:5555
2019-03-11 11:47:45,208 INFO ipc.NettyServer: [id: 0xcdc38369, /10.0.0.101:54420 => /10.0.0.100:5555] CONNECTED: /10.0.0.101:54420
2019-03-11 11:48:28,714 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70       hello mshk.top }
2019-03-11 11:48:51,429 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70 31    hello mshk.top1 }

# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 06:08:35,762 INFO ipc.NettyServer: [id: 0xcbc7fefa, /10.0.0.101:57248 => /10.0.0.101:5555] OPEN
2019-03-11 06:08:35,763 INFO ipc.NettyServer: [id: 0xcbc7fefa, /10.0.0.101:57248 => /10.0.0.101:5555] BOUND: /10.0.0.101:5555
2019-03-11 06:08:35,763 INFO ipc.NettyServer: [id: 0xcbc7fefa, /10.0.0.101:57248 => /10.0.0.101:5555] CONNECTED: /10.0.0.101:57248
2019-03-11 06:09:21,731 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70       hello mshk.top }
2019-03-11 06:09:43,734 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70 31    hello mshk.top1 }

  

案例9:Multiplexing Channel Selector

  多路复用情况,当事件的属性与预配置的值匹配时,事件将被传递到可用通道的子集。
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/Multiplexing_Channel_Selector.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Multiplexing_Channel_Selector.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# r1 是 source 的名称,设置 source 的 channel,通过HTTP POST和GET接受Flume事件的源
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
# 映射允许每个值通道可以重叠。默认值可以包含任意数量的通道。
a1.sources.r1.selector.mapping.baidu = c0
a1.sources.r1.selector.mapping.ali = c1
a1.sources.r1.selector.default = c0

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c0
a1.sinks.k1.hostname = c0
a1.sinks.k1.port = 5555

# k2 是 sink 的名称,设置 sink 的类型
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = c1
a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

  
  在 c0 创建 /home/work/_app/apache-flume-1.9.0-bin/conf/Multiplexing_Channel_Selector_avro.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Multiplexing_Channel_Selector_avro.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = avro
a1.sources.r1.channels = c0
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c0
a1.sinks.k1.channel = c0

  
  将2个配置文件复制到 c1 上一份

[root@c0 ~]# scp -r /home/work/_app/apache-flume-1.9.0-bin/conf/Multiplexing_Channel_Selector* c1:/home/work/_app/apache-flume-1.9.0-bin/conf/
Multiplexing_Channel_Selector_avro.conf     100%  485   639.8KB/s   00:00
Multiplexing_Channel_Selector.conf          100%  963     1.4MB/s   00:00

  
  打开4个窗口,在 c0c 上同时启动两个 FlumeAgent

# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 06:05:23,297 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: []
2019-03-11 06:05:23,297 WARN node.AbstractConfigurationProvider: No configuration found for this host:a1
2019-03-11 06:05:23,308 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }

# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:34:05,370 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: []
2019-03-11 01:34:05,377 WARN node.AbstractConfigurationProvider: No configuration found for this host:a1
2019-03-11 01:34:05,383 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }

# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 06:20:59,177 INFO server.Server: Started @1519ms
2019-03-11 06:20:59,178 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 06:20:59,178 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:50:05,998 INFO server.Server: Started @1315ms
2019-03-11 01:50:05,998 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 01:50:05,998 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

  
  然后在 c0 上,测试产生log

[root@c0 ~]# curl -X POST -d '[{ "headers" :{"type" : "baidu"},"body" : "mshk.top_TEST1"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "ali"},"body" : "mshk.top_TEST2"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "qq"},"body" : "mshk.top_TEST3"}]' http://localhost:5140

  
  在 c0sink 窗口,可以看到以下信息:

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 06:05:23,297 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: []
2019-03-11 06:05:23,297 WARN node.AbstractConfigurationProvider: No configuration found for this host:a1
2019-03-11 06:05:23,308 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }

2019-03-11 06:22:58,825 INFO sink.LoggerSink: Event: { headers:{type=baidu} body: 6D 73 68 6B 2E 74 6F 70 5F 54 45 53 54 31       mshk.top_TEST1 }
2019-03-11 06:22:58,825 INFO sink.LoggerSink: Event: { headers:{type=qq} body: 6D 73 68 6B 2E 74 6F 70 5F 54 45 53 54 33       mshk.top_TEST3 }

  
  在 c1sink 窗口,可以看到以下信息:

[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:34:05,370 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: []
2019-03-11 01:34:05,377 WARN node.AbstractConfigurationProvider: No configuration found for this host:a1
2019-03-11 01:34:05,383 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }

2019-03-11 01:50:56,054 INFO sink.LoggerSink: Event: { headers:{type=ali} body: 6D 73 68 6B 2E 74 6F 70 5F 54 45 53 54 32       mshk.top_TEST2 }

  可以看到,根据header中不同的条件分布到不同的channel上
   

案例10:Flume Sink Processors

  Sink Groups允许用户在一个代理中对多个 sink 进行分组。Sink Processor 能够实现分组内的sink负载均衡。以及组内 sink 容错,实现当组内一个 sink 失败时,切换至其他的 sink

  • Default Sink Processor

  默认的 Sink Processor 仅接受单独一个 sink。不必对单个 sink 使用 processor。对单个 sink 可以使用 source-channel-sink 的方式。

  • Failorver Sink Processor

  Failover Sink Processor(容错处理器)拥有一个 sink 的优先级列表,用来保证只有一个 sink 可用。

  容错机制将失败的 sink 放入一个冷却池中,并给他设置一个冷却时间,如果重试中不断失败,冷却时间将不断增加。一旦 sink 成功的发送 eventsink 将被重新保存到一个可用`sink` 池中。在这个可用 `sink` 池中,每一个sink 都有一个关联优先级值,值越大优先级越高。当一个 sink 发送 event 失败时,剩下的 sink 中优先级最高的 sink 将试着发送 event
  failover 的机器是一直发送给其中一个sink,当这个 sink 不可用的时候,自动发送到下一个sink
  
  接下来我们开始继续实验,创建 /home/work/_app/apache-flume-1.9.0-bin/conf/Flume_Sink_Processors.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Flume_Sink_Processors.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c0 c1

#这个是配置failover的关键,需要有一个sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
# 处理的类型是failover
a1.sinkgroups.g1.processor.type = failover
# 优先级,数字越大优先级越高,每个sink的优先级必须不相同
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
# 失败的接收器的最大退避时间,设置为10秒,当然可以根据你的实际状况更改成更快或者很慢
a1.sinkgroups.g1.processor.maxpenalty = 10000

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c0 c1
a1.sources.r1.selector.type = replicating

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c0
a1.sinks.k1.hostname = c0
a1.sinks.k1.port = 5555

# k2 是 sink 的名称,设置 sink 的类型
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = c1
a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

  
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/Flume_Sink_Processors_avro.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Flume_Sink_Processors_avro.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c0

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = avro
a1.sources.r1.channels = c0
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c0
a1.sinks.k1.channel = c0

  
  将2个配置文件复制到 c1 上一份

[root@c0 ~]# scp -r /home/work/_app/apache-flume-1.9.0-bin/conf/Flume_Sink_Processors* c1:/home/work/_app/apache-flume-1.9.0-bin/conf/
Flume_Sink_Processors_avro.conf     100%  485   585.8KB/s   00:00
Flume_Sink_Processors.conf          100% 1175     1.6MB/s   00:00

  
  打开4个窗口,在 c0c1 上同时启动两个 FlumeAgent

# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 06:29:23,481 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 06:29:23,483 INFO source.AvroSource: Avro source r1 started.

# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:57:31,987 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 01:57:31,987 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 01:57:31,989 INFO source.AvroSource: Avro source r1 started.

# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 06:31:44,971 INFO sink.AbstractRpcSink: Rpc sink k2: Building RpcClient with hostname: c1, port: 5555
2019-03-11 06:31:44,971 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2019-03-11 06:31:44,971 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2019-03-11 06:31:44,985 INFO sink.AbstractRpcSink: Rpc sink k2 started.

# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:59:51,704 INFO sink.AbstractRpcSink: Rpc sink k2: Building RpcClient with hostname: c1, port: 5555
2019-03-11 01:59:51,704 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2019-03-11 01:59:51,718 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2019-03-11 01:59:51,737 INFO sink.AbstractRpcSink: Rpc sink k2 started.

  
  然后在 c0上,测试产生log

[root@c0 ~]# echo "mshk.top test1 failover" | nc localhost 5140

  
  因为 c1 的优先级高,所以在 c1sink 窗口,可以看到以下信息,而 c0 没有:

[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:57:31,987 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 01:57:31,987 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 01:57:31,989 INFO source.AvroSource: Avro source r1 started.

2019-03-11 02:02:11,750 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 31 20 66 mshk.top test1 failover }

  
  这时我们停止掉 c1 机器上的 sink (ctrl+c),再次输出测试数据

[root@c0 ~]# echo "mshk.top test2 failover" | nc localhost 5140

  
  可以在 c0sink 窗口,看到读取到了刚才发送的两条测试数据:

# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 06:29:23,481 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 06:29:23,483 INFO source.AvroSource: Avro source r1 started.

2019-03-11 07:09:58,232 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 32 20 66 mshk.top test2 f }

  
  我们再在 c1sink 窗口中,启动 sink

[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

  
  输入两批测试数据:

[root@c0 ~]# echo "mshk.top test3 failover" | nc localhost 5140 && echo "mshk.top test4 failover" | nc localhost 5140

  
  在 c1sink 窗口,我们可以看到以下信息,因为优先级的关系,log消息会再次落到 c1 上:

2019-03-11 02:39:56,644 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 33 20 66 mshk.top test3 f }
2019-03-11 02:39:56,644 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 34 20 66 mshk.top test4 f }

  

案例11:Load balancing Sink Processor

  Load balancing Sink processor(负载均衡处理器)在多个 sink 间实现负载均衡。数据分发到多个活动的 sink,处理器用一个索引化的列表来存储这些 sink 的信息。处理器实现两种数据分发机制,轮循选择机制和随机选择机制。默认的分发机制是轮循选择机制,可以通过配置修改。同时我们可以通过继承AbstractSinkSelector来实现自定义数据分发选择机制。
  选择器按照我们配置的选择机制执行选择 sink。当 sink 失败时,处理器将根据我们配置的选择机制,选择下一个可用的 sink。这种方式中没有黑名单,而是主动尝试每一个可用的 sink。如果所有的 sink 都失败了,选择器将把这些失败传递给 sink的执行者。
  如果设置 backofftrue,处理器将会把失败的 sink 放进黑名单中,并且为失败的 sink 设置一个在黑名单驻留的时间,在这段时间内,sink 将不会被选择接收数据。当超过黑名单驻留时间,如果该 sink 仍然没有应答或者应答迟缓,黑名单驻留时间将以指数的方式增加,以避免长时间等待 sink 应答而阻塞。如果设置 backofffalse,在轮循的方式下,失败的数据将被顺序的传递给下一个 sink,因此数据分发就变成非均衡的了。
  
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/Load_balancing_Sink_Processors.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Load_balancing_Sink_Processors.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# 这个是配置Load balancing的关键,需要有一个sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
# 失败的接收器是否会以指数方式退回。
a1.sinkgroups.g1.processor.backoff = true
# 轮循机制,必须是round_robin,random或自定义类的FQCN,它继承自AbstractSinkSelector
a1.sinkgroups.g1.processor.selector = round_robin

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1


# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = c0
a1.sinks.k1.port = 5555

# k2 是 sink 的名称,设置 sink 的类型
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = c1
a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

  
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/Load_balancing_Sink_Processors_avro.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Load_balancing_Sink_Processors_avro.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c0

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = avro
a1.sources.r1.channels = c0
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c0
a1.sinks.k1.channel = c0

  
  将2个配置文件复制到 c1 上一份

[root@c0 ~]# scp -r /home/work/_app/apache-flume-1.9.0-bin/conf/Load_balancing_Sink_Processors* c1:/home/work/_app/apache-flume-1.9.0-bin/conf/
Load_balancing_Sink_Processors_avr.conf     100%  485   678.9KB/s   00:00
Load_balancing_Sink_Processors.conf         100%  802     1.0MB/s   00:00

  
  打开4个窗口,在 c0c1 上同时启动两个 FlumeAgent

# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 07:18:38,157 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }...
2019-03-11 07:18:38,428 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 07:18:38,429 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 07:18:38,431 INFO source.AvroSource: Avro source r1 started.

# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 02:46:45,515 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }...
2019-03-11 02:46:45,843 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 02:46:45,843 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 02:46:45,845 INFO source.AvroSource: Avro source r1 started.

# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 07:24:27,506 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k2 started
2019-03-11 07:24:27,506 INFO sink.AbstractRpcSink: Rpc sink k2: Building RpcClient with hostname: c1, port: 5555
2019-03-11 07:24:27,506 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2019-03-11 07:24:27,507 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2019-03-11 07:24:27,515 INFO sink.AbstractRpcSink: Rpc sink k2 started.

# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 02:52:32,325 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k2 started
2019-03-11 02:52:32,325 INFO sink.AbstractRpcSink: Rpc sink k2: Building RpcClient with hostname: c1, port: 5555
2019-03-11 02:52:32,325 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2019-03-11 02:52:32,326 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2019-03-11 02:52:32,341 INFO sink.AbstractRpcSink: Rpc sink k2 started.

  
  然后在 c0上,测试产生log,一行一行输入,输入太快,容易落到一台机器上

[root@c0 ~]# echo "mshk.top test1" | nc localhost 5140
[root@c0 ~]# echo "mshk.top test2" | nc localhost 5140
[root@c0 ~]# echo "mshk.top test3" | nc localhost 5140
[root@c0 ~]# echo "mshk.top test4" | nc localhost 5140

  
  在 c0sink 窗口,可以看到以下信息

# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 07:18:38,157 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }...
2019-03-11 07:18:38,428 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 07:18:38,429 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 07:18:38,431 INFO source.AvroSource: Avro source r1 started.

2019-03-11 02:55:16,074 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 31       mshk.top test1 }
2019-03-11 02:55:22,020 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 33       mshk.top test3 }

  
  在 c1sink 窗口,可以看到以下信息:

# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 02:46:45,515 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }...
2019-03-11 02:46:45,843 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 02:46:45,843 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 02:46:45,845 INFO source.AvroSource: Avro source r1 started.

2019-03-11 07:27:16,039 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 32       mshk.top test2 }
2019-03-11 07:27:25,042 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 34       mshk.top test4 }

  
  说明轮询模式起到了作用。
  

案例12:Taildir Source

  Taildir Source 是在 Flume 1.7.0 版本推出的组件 ,通过 tail 监控正则表达式匹配目录下的所有文件,并在检测到添加到每个文件的新行后几乎实时地操作。
  如果正在写入新行,则此源将重试读取它们以等待写入完成。Taildir Source 定期以 JSON 格式写入给定位置文件上每个文件的最后读取位置。如果 Flume 由于某种原因停止或停止,它可以从写在现有位置文件上的位置重新开始读取。在其他用例中,Taildir Source 也可以使用给定的位置文件从每个文件的任意位置开始读取。当指定路径上没有位置文件时,默认情况下 Taildir Source 将从每个文件的第一行开始读取。
  
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_source.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_source.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
## 以JSON格式文件以记录每个尾部文件的inode,绝对路径和最后位置
a1.sources.r1.positionFile = /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json
## 以空格分隔的文件组列表。每个文件组都指示一组要挂起的文件。
a1.sources.r1.filegroups = f1 f2
## 文件组的绝对路径
a1.sources.r1.filegroups.f1 =  /home/work/_app/apache-flume-1.9.0-bin/logs/taildir_example.log
a1.sources.r1.headers.f1.headerKey1 = value1
## 文件组的绝对路径
a1.sources.r1.filegroups.f2 =/home/work/_app/apache-flume-1.9.0-bin/logs/.*mshk.top.log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
## 是否添加存储绝对路径文件名的标头
a1.sources.r1.fileHeader = true
# 控制从同一文件连续读取的批次数。如果源正在拖尾多个文件,并且其中一个文件以快速写入,则可以防止处理其他文件,因为繁忙文件将在无限循环中读取。在这种情况下,降低此值。
a1.sources.r1.maxBatchCount = 1000

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = logger

# c1 是 channel 的名称,设置 channel的类型是内存。事件存储在具可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流量,并且在代理发生故障时准备丢失分阶段数据。
a1.channels.c1.type = memory
## Channel 中存储的最大事件数
a1.channels.c1.capacity = 1000
## 每个事件 Channel 从 Source 或提供给 Sink 的最大事件数
a1.channels.c1.transactionCapacity = 100

# 绑定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  
  启动 FlumeAgent 名称是 a1

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/taildir_source.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 12:34:24,138 INFO taildir.ReliableTaildirEventReader: headerTable: {f1={headerKey1=value1}, f2={headerKey1=value2, headerKey2=value2-2}}
2019-03-11 12:34:24,143 INFO taildir.ReliableTaildirEventReader: Updating position from position file: /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json
2019-03-11 12:34:24,144 INFO taildir.ReliableTaildirEventReader: File not found: /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json, not updating position
2019-03-11 12:34:24,146 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 12:34:24,146 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started

  
  创建指定文件

[root@c0 ~]# echo "hello mshk.top" > $FLUME_HOME/logs/taildir_example.log
[root@c0 ~]# echo "hello mshk.top1" > $FLUME_HOME/logs/abc.mshk.top.log.1

    
  在 c0 的控制台,可以看到以下信息

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/taildir_source.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 12:34:24,138 INFO taildir.ReliableTaildirEventReader: headerTable: {f1={headerKey1=value1}, f2={headerKey1=value2, headerKey2=value2-2}}
2019-03-11 12:34:24,143 INFO taildir.ReliableTaildirEventReader: Updating position from position file: /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json
2019-03-11 12:34:24,144 INFO taildir.ReliableTaildirEventReader: File not found: /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json, not updating position
2019-03-11 12:34:24,146 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 12:34:24,146 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 12:37:29,191 INFO taildir.ReliableTaildirEventReader: Opening file: /home/work/_app/apache-flume-1.9.0-bin/logs/taildir_example.log, inode: 1613028097, pos: 0
2019-03-11 12:37:34,156 INFO sink.LoggerSink: Event: { headers:{headerKey1=value1, file=/home/work/_app/apache-flume-1.9.0-bin/logs/taildir_example.log} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70       hello mshk.top }
2019-03-11 12:37:39,198 INFO taildir.ReliableTaildirEventReader: Opening file: /home/work/_app/apache-flume-1.9.0-bin/logs/abc.mshk.top.log.1, inode: 1613028098, pos: 0
2019-03-11 12:37:39,199 INFO sink.LoggerSink: Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/work/_app/apache-flume-1.9.0-bin/logs/abc.mshk.top.log.1} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70 31    hello mshk.top1 }

  
  再次查看 /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json 文件,可以看到以下内容

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json
[{"inode":1613028097,"pos":15,"file":"/home/work/_app/apache-flume-1.9.0-bin/logs/taildir_example.log"},{"inode":1613028098,"pos":16,"file":"/home/work/_app/apache-flume-1.9.0-bin/logs/abc.mshk.top.log.1"}]

  
  可以看到 taildir_position.json 记录了每个消费位置的元数据,每消费一次便会更新这个文件。
  

案例13:Hbase

 Hbase 配置是从类路径中遇到的第一个 hbase-site.xml 中获取的。实现由配置指定的 HbaseEventSerializer 的类用于将事件转换为 HBase put。然后将这些放置和增量写入HBase。
 如果 Hbase 无法写入某些事件,则接收器将重播该事务中的所有事件。
 Flume 提供了两个序列化器。SimpleHbaseEventSerializer(org.apache.flume.sink.hbase.SimpleHbaseEventSerializer)按原样将事件主体写入HBase,并可选择增加Hbase中的列。RegexHbaseEventSerializer(org.apache.flume.sink.hbase.RegexHbaseEventSerializer)根据给定的正则表达式打破事件体,并将每个部分写入不同的列。
  在测试之前,请先参考<Hadoop 3.1.2(HA)+Zookeeper3.4.13+Hbase1.4.9(HA)+Hive2.3.4+Spark2.4.0(HA)高可用集群搭建>将 Hbase 启动

  通过下面的命令,在 Hbase 中创建 flume2hbase_mshk_top 表:

[root@c0 ~]# hbase shell
HBase Shell
Use "help" to get list of supported commands.
Use "exit" to quit this interactive shell.
Version 1.4.9, rd625b212e46d01cb17db9ac2e9e927fdb201afa1, Wed Dec  5 11:54:10 PST 2018

hbase(main):001:0> list
TABLE
mysql2hase_mshk
1 row(s) in 0.1820 seconds

=> ["mysql2hase_mshk"]
hbase(main):002:0> version
1.4.9, rd625b212e46d01cb17db9ac2e9e927fdb201afa1, Wed Dec  5 11:54:10 PST 2018

hbase(main):003:0> create 'flume2hbase_mshk_top','uid','name'
0 row(s) in 1.3600 seconds

=> Hbase::Table - flume2hbase_mshk_top
hbase(main):004:0> scan 'flume2hbase_mshk_top'
ROW                                                        COLUMN+CELL
0 row(s) in 0.0330 seconds

hbase(main):005:0> quit

  
  创建 /home/work/_app/apache-flume-1.9.0-bin/conf/hbase_simple.conf 文件编辑并保存,内容如下:

[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/hbase_simple.conf
# a1 是 agent 名称,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# r1 是 source 的名称,设置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

# k1 是 sink 的名称,设置 sink 的类型
a1.sinks.k1.type = logger
a1.sinks.k1.type = hbase
a1.sinks.k1.table = flume2hbase_mshk_top
a1.sinks.k1.columnFamily = name
a1.sinks.k1.column = mshk
a1.sinks.k1.serializer =  org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  
  启动 FlumeAgent 名称是 a1

[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-19 14:22:25,605 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
2019-03-19 14:22:25,605 INFO node.AbstractConfigurationProvider: Creating channels
2019-03-19 14:22:25,610 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
2019-03-19 14:22:25,614 INFO node.AbstractConfigurationProvider: Created channel c1
2019-03-19 14:22:25,614 INFO source.DefaultSourceFactory: Creating instance of source r1, type syslogtcp
2019-03-19 14:22:25,626 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: hbase
2019-03-19 14:22:25,840 INFO hbase.HBaseSink: The write to WAL option is set to: true
2019-03-19 14:22:25,842 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
2019-03-19 14:22:25,847 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@311a0c0d counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
...
0x3000003d05c0004, negotiated timeout = 4000
2019-03-19 14:22:27,936 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2019-03-19 14:22:27,936 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started

  
  测试产生log

[root@c0 ~]# echo "hello mshk.top from flume" | nc localhost 5140

  
  这时登录到 Hbase 中,可以发现新数据已经插入

[root@c0 ~]# hbase shell
HBase Shell
Use "help" to get list of supported commands.
Use "exit" to quit this interactive shell.
Version 1.4.9, rd625b212e46d01cb17db9ac2e9e927fdb201afa1, Wed Dec  5 11:54:10 PST 2018

hbase(main):001:0> list
TABLE
flume2hbase_mshk_top
mysql2hase_mshk
2 row(s) in 0.2010 seconds

=> ["flume2hbase_mshk_top", "mysql2hase_mshk"]
hbase(main):002:0> scan 'flume2hbase_mshk_top'
ROW                                                        COLUMN+CELL
 1552977018028-PiTWUgkag4-0                                column=name:payload, timestamp=1552977021290, value=hello mshk.top from flume
1 row(s) in 0.1230 seconds

hbase(main):003:0> quit

  

常见问题

如何让 Flume 以守护进程方式运行

  运行以下命令:

[root@c0 ~]# nohup flume-ng agent -c . -f $FLUME_HOME/conf/nginx_logs.conf -n a1 -Dflume.root.logger=INFO,console &
[1] 10276
[root@c0 ~]# nohup: ignoring input and appending output to ‘nohup.out’

  
  
  
  经过这么多 Flume的例子测试,如果你全部做完后,会发现 Flume 的功能真的很强大,可以进行各种搭配来完成你想要的工作,俗话说师傅领进门,修行在个人,如何能够结合你的产品业务,将 Flume 更好的应用起来,快去动手实践吧。
  
  希望本文对您有帮助,感谢您的支持和阅读我的博客。
  


博文作者:迦壹
博客地址:Flume1.9.0的安装、部署、简单应用(含分布式、与Hadoop3.1.2、Hbase1.4.9的案例)
转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!
  
假设您认为这篇文章对您有帮助,可以通过以下方式进行捐赠,谢谢!
向陌上花开捐赠
比特币地址:1KdgydfKMcFVpicj5w4vyn3T88dwjBst6Y
以太坊地址:0xbB0a92d634D7b9Ac69079ed0e521CC2e0a97c420


说点什么

avatar
  Subscribe  
提醒