最新消息:

Flume进阶之-拦截器Interceptor和监控

IT技术 ipcpu 465浏览

Flume进阶Interceptor和监控.md

一、Flume的Interceptor

Flume中的拦截器(interceptor),用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。

使用拦截器Interceptor之前先要了解Flume中事件(event)的结构:

event是flume传输的最小对象,从source获取数据后会先封装成event,然后将event发送到channel,sink从channel拿event消费。

event由头部(Map headers)和身体(body)两部分组成:Headers部分是一个map,body部分可以是String或者byte[]等。其中body部分是真正存放数据的地方。

Flume的拦截器Interceptor可以在event放到channel之前插入一些header,比如HDFS Sink用到的timestamp,也可以对body内容进行修改或者根据内容过滤。

Flume-NG自带拦截器有多种,常用的有如下:

Timestamp Interceptor, 可以往event的header中插入关键词为timestamp的时间戳。

Host Interceptor, 该拦截器可以往event的header中插入关键词默认为host主机名或者ip地址

Static Interceptor,增加一个static的header并为所有的事件赋值。范围是所有事件。

Search and Replace Interceptor,按照java正则表达式,对event内容进行查找和替换

Regex FilteringInterceptor,该拦截器用于过滤事件,筛选出与配置的正则表达式相匹配的事件。可以用于包含事件和排除事件。常用于数据清洗,通过正则表达式把数据过滤出来。

接下来我们用一个例子来演示Timestamp Interceptor和Host Interceptor

a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.preserveExisting= false
a1.sources.r1.interceptors.i1.type =timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader =hostname
a1.sources.r1.interceptors.i2.useIP = true 

最终效果如下,

2018-02-06 18:50:54,818 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{hostname=10.140.12.12, timestamp=1517914254814} body: 69 61 6D 73 74 72 69 6E 67 0D                  iamstring. }

我们看到header中增加了2个字段hostname和timestamp。

接下来我们看一个正则过滤(Regex FilteringInterceptor)的例子:

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents =true
#@过滤掉(删除)所有数字开头的数据

再来看两个替换的例子(Search and Replace Interceptor)

a1.sources.r1.interceptors = search-replace
a1.sources.r1.interceptors.search-replace.type = search_replace
a1.sources.r1.interceptors.search-replace.searchPattern = actionType
a1.sources.r1.interceptors.search-replace.replaceString = AT
#@效果是替换event内容中的actionType为AT
a1.sources.r1.interceptors = search-replace
a1.sources.r1.interceptors.search-replace.type = search_replace
a1.sources.r1.interceptors.search-replace.searchPattern = \"actionType\"
a1.sources.r1.interceptors.search-replace.replaceString = "programid":0,"actionType"
#@效果是在actionType前面加上一些数据

二、Flume的监控

Flume本身内置了一些监控接口,主要有如下几个

JMX Reporting,提供JMX接口的监控数据

Ganglia Reporting,将监控数据发送到Ganglia

JSON Reporting,提供HTTP接口的json格式监控数据

我们平时最常用的就是第三个JSON格式的监控数据,先看下如何开启

nohup bin/flume-ng agent -c conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 

只需要在启动参数加上后面2个参数就可以,flume会在34545启动一个HTTP服务(Jetty)

我们来看下数据都有哪些指标

# curl -s http://127.0.0.1:34545/metrics | jq
{
"SINK.k1": {
    "ConnectionCreatedCount": "2",    //当前有效的连接数
    "ConnectionClosedCount": "1",    //
    "Type": "SINK",            //组件类型是sink
    "BatchCompleteCount": "0",        //成功完成输出的批量event个数
    "BatchEmptyCount": "0",
    "EventDrainAttemptCount": "6",    //sink成功写出到存储的event总数量
    "StartTime": "1517992885268",    //组件启动时间
    "EventDrainSuccessCount": "6",
    "BatchUnderflowCount": "0",    //没有达到batchsize的批量event数目
    "StopTime": "0",
    "ConnectionFailedCount": "0"    //sink端连接失败的次数
  },
  "CHANNEL.c1": {
    "ChannelCapacity": "10000",        //channel的容量
    "ChannelFillPercentage": "0.0",    //channel空间已使用的百分比
    "Type": "CHANNEL",            //组件类型是channel
    "ChannelSize": "0",            //目前channel中event的总数量
    "EventTakeSuccessCount": "6",    //sink成功读取的event的总数量
    "EventTakeAttemptCount": "520",    //sink尝试从channel拉取event的次数
    "StartTime": "1517992885266",    //组件启动时间
    "EventPutAttemptCount": "6",    //source尝试写入channel的次数
    "EventPutSuccessCount": "6",    //source成功写入channel且提交的event总数量
    "StopTime": "0"
  },
  "SOURCE.r1": {
    "KafkaEventGetTimer": "5999",
    "AppendBatchAcceptedCount": "0",
    "AppendReceivedCount": "0",
    "EventAcceptedCount": "6",
    "StartTime": "1517992886425",    //组件启动时间
    "AppendBatchReceivedCount": "0",
    "KafkaCommitTimer": "12",
    "EventReceivedCount": "6",
    "Type": "SOURCE",            //组件类型是source
    "KafkaEmptyCount": "0",
    "OpenConnectionCount": "0",
    "AppendAcceptedCount": "0",
    "StopTime": "0"
  }
}

指标太多,暂时还不能全部理解,channel中的ChannelFillPercentage需要格外注意,要是满了Flume基本上就没法工作了。
以下是Zabbix+Grafana出来的图像。

三、参考资料

http://blog.csdn.net/looklook5/article/details/40588669
http://blog.csdn.net/liuxiao723846/article/details/78135489
http://flume.cn/2016/05/18/flume%E7%9A%84http%E7%9B%91%E6%8E%A7%E5%8F%82%E6%95%B0%E8%AF%B4%E6%98%8E/
http://www.cnblogs.com/lxf20061900/p/3845356.html

转载请注明:IPCPU-网络之路 » Flume进阶之-拦截器Interceptor和监控