其他分享
首页 > 其他分享> > Flume-day03_进阶案例

Flume-day03_进阶案例

作者:互联网

案例六、多路复制

1、将flume复制到node1,node2

[root@master soft]# scp -r flume-1.9.0 node1:`pwd`
[root@master soft]# scp -r flume-1.9.0 node2:`pwd`

2、在node1节点的/usr/local/soft/bigdata17/scripts 下新建配置文件:

vim netcat-flume-loggers.conf

添加如下内容

a3.sources = r3
a3.channels = c3
a3.sources.r3.type = avro
a3.sources.r3.channels = c3
a3.sources.r3.bind = node1
a3.sources.r3.port = 4141

a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

a3.sinks = k3
a3.sinks.k3.type = logger
a3.sinks.k3.channel = c3

3、在node2节点的 /usr/local/soft/bigdata17/scripts 下新建配置文件:

vim netcat-flume-loggers.conf

添加如下内容:

a4.sources = r4
a4.channels = c4
a4.sources.r4.type = avro
a4.sources.r4.channels = c4
a4.sources.r4.bind = node2
a4.sources.r4.port = 4141

a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100

a4.sinks = k4
a4.sinks.k4.type = logger
a4.sinks.k4.channel = c4

4、在master节点的 /usr/local/soft/bigdata17/scrips 下新建配置文件:

vim netcat-flume-loggers.conf

添加如下内容

a2.sources = r1
a2.sinks = k1 k2
a2.channels = c1 c2

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = master
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = node1
a2.sinks.k1.port = 4141

a2.sinks.k2.type = avro
a2.sinks.k2.hostname = node2
a2.sinks.k2.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2

三台服务器的配置文件建好了,现在就可以启动flume集群了:

先启动node1和node2节点的logger服务端:

flume-ng agent -n a3 -c ../../flume-1.9.0/conf -f ./netcat-flume-loggers.conf -Dflume.root.logger=INFO,console
flume-ng agent -n a4 -c ../../flume-1.9.0/conf -f ./netcat-flume-loggers.conf -Dflume.root.logger=INFO,console

启动master节点的netcat:

flume-ng agent -n a2 -c ../../flume-1.9.0/conf -f ./netcat-flume-loggers.conf -Dflume.root.logger=INFO,console

开启netcat后此窗口就不能操作了,再新建一个master窗口启动telnet:

telnet master 44444

master上输入数据:

node1和node2接收数据:

案例七、故障转移

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。这里的故障,指的是Sink故障

1)通过sinkgroups里priority属性配置的权重来决定哪台的优先级高,同一时间只能有一台机器工作

2)当当前的sink挂掉后切换为standby模式(假设优先级10),并立刻切换到另一台(假设优先级9),当sink修复好重新启动后,隔段时间会恢复使用优先级为10的sink

3)遇到故障时,我们要立即修复

master:

vim guzhang.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#将数据写到另一台Flume服务器上
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555

#将数据写到另一台Flume服务器上
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666

#使用sink processor来控制channel的数据流向
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2  
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

node1

a3.sources = r3
a3.channels = c3
a3.sources.r3.type = avro
a3.sources.r3.channels = c3
a3.sources.r3.bind = node1
a3.sources.r3.port = 5555

a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

a3.sinks = k3
a3.sinks.k3.type = logger
a3.sinks.k3.channel = c3

node2

a4.sources = r4
a4.channels = c4
a4.sources.r4.type = avro
a4.sources.r4.channels = c4
a4.sources.r4.bind = node2
a4.sources.r4.port = 6666

a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100

a4.sinks = k4
a4.sinks.k4.type = logger
a4.sinks.k4.channel = c4

先启动node1,node2上的

flume-ng agent -n a3 -c ../../flume-1.9.0/conf -f ./guzhang.conf -Dflume.root.logger=INFO,console
flume-ng agent -n a4 -c ../../flume-1.9.0/conf -f ./guzhang.conf -Dflume.root.logger=INFO,console

再启动master的

flume-ng agent -n a1 -c ../../flume-1.9.0/conf -f ./guzhang.conf -Dflume.root.logger=INFO,console

master输入数据

telnet master 4444

数据会打到node2

将node2手动关闭,再输入数据,这时候数据打到node1

再将node2启动起来,再输入数据,这时候,node2继续接收

案例八、负载均衡

通过将sinkprocessor里的type属性来控制processor模式,分别是(负载均衡load_balance、故障转移failover)

使用负载均衡以后,channel会轮训分配任务,减少机器负荷

master上的配置文件:(随机的)

a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

案例九、聚合

node1、node2两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log 现在要求:

把node1、node2机器中的access.log、nginx.log、web.log 采集汇总到master机器上然后统一收集到hdfs中。 但是在hdfs中要求的目录为:

/shujia/bigdata17/flumelogs/access/20220616/** 
/shujia/bigdata17/flumelogs/nginx/20180616/** 
/shujia/bigdata17/flumelogs/web/20180616/**

场景分析:

数据流程处理分析:

实现:

node1对应的IP为 192.168.40.120
node2对应的IP为 192.168.40.130
master对应的IP为 192.168.40.110

node1和node2上配置文件

[root@node2 bigdata17]# mkdir -p /usr/local/soft/bigdata17/scrips/taillogs

[root@node2 bigdata17]# touch /usr/local/soft/bigdata17/scrips/taillogs/access.log
[root@node2 bigdata17]# touch /usr/local/soft/bigdata17/scrips/taillogs/nginx.log
[root@node2 bigdata17]# touch /usr/local/soft/bigdata17/scrips/taillogs/web.log
vim exec_source_avro_sink.conf
# Name the components on this agent 
a1.sources = r1 r2 r3 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /usr/local/soft/bigdata17/scrips/taillogs/access.log 
# static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = static 
a1.sources.r1.interceptors.i1.key = type 
a1.sources.r1.interceptors.i1.value = access 

a1.sources.r2.type = exec 
a1.sources.r2.command = tail -F /usr/local/soft/bigdata17/scrips/taillogs/nginx.log 
a1.sources.r2.interceptors = i2 
a1.sources.r2.interceptors.i2.type = static 
a1.sources.r2.interceptors.i2.key = type 
a1.sources.r2.interceptors.i2.value = nginx 

a1.sources.r3.type = exec 
a1.sources.r3.command = tail -F /usr/local/soft/bigdata17/scrips/taillogs/web.log 
a1.sources.r3.interceptors = i3 
a1.sources.r3.interceptors.i3.type = static 
a1.sources.r3.interceptors.i3.key = type 
a1.sources.r3.interceptors.i3.value = web 

# Describe the sink 
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = master 
a1.sinks.k1.port = 41414 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 20000 
a1.channels.c1.transactionCapacity = 10000 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sources.r2.channels = c1 
a1.sources.r3.channels = c1 
a1.sinks.k1.channel = c1

在master上面开发flume配置文件

vim avro_source_hdfs_sink.conf
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# 定义source 
a1.sources.r1.type = avro 
a1.sources.r1.bind = master 
a1.sources.r1.port =41414 
# 添加时间拦截器 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = timestamp

# 定义channels 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 20000 
a1.channels.c1.transactionCapacity = 10000 

# 定义sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path=hdfs://master:9000/shujia/bigdata17/flumelogs/%{type}/%Y%m%d 
a1.sinks.k1.hdfs.filePrefix = events 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
# 时间类型 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
# 生成的文件不按条数生成 
a1.sinks.k1.hdfs.rollCount = 0 
# 生成的文件按时间生成 
a1.sinks.k1.hdfs.rollInterval = 30 
# 生成的文件按大小生成 
a1.sinks.k1.hdfs.rollSize = 10485760 
# 批量写入hdfs的个数 
a1.sinks.k1.hdfs.batchSize = 10000 
# flume操作hdfs的线程数(包括新建,写入等) 
a1.sinks.k1.hdfs.threadsPoolSize=10 
# 操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000 


# 组装source、channel、sink 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

采集端文件生成脚本
在node1与node2上面开发shell脚本,模拟数据生成 server.sh

# !/bin/bash 

while true 
	do
	date >> /usr/local/soft/bigdata17/scrips/taillogs/access.log; 
	date >> /usr/local/soft/bigdata17/scrips/taillogs/web.log; 
	date >> /usr/local/soft/bigdata17/scrips/taillogs/nginx.log; 
	sleep 0.5; 
done

顺序启动服务
master启动flume实现数据收集

flume-ng agent -n a1 -c ../../flume-1.9.0/conf -f ./avro_source_hdfs_sink.conf -Dflume.root.logger=INFO,console

node1与node2启动flume实现数据监控

 flume-ng agent -n a1 -c ../../flume-1.9.0/conf -f ./exec_source_avro_sink.conf -Dflume.root.logger=INFO,console

node1与node2启动生成文件脚本

sh server.sh

案例十、ChannelSelector案例

ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。

ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。默认是Replicating

  1. Multiplexing类型的ChannelSelector会根据Event中Header中的某个属性决定分发到哪个Channel。
  2. 每个event里的header默认是没有值的,所以,multiplexing类型的ChannelSelector一般会配合自定义拦截器使用

replicating类型例子:

a1.sources = r1
a1.channels = c1 c2 # 如果有100个Event,那么c1和c2中都会有这100个事件

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

multiplexing类型的ChannelSelector例子:

a1.sources = r1
a1.channels = c1 c2

a1.sources.source1.selector.type = multiplexing
a1.sources.source1.selector.header = title # 以header中的title对应的值作为条件
a1.sources.source1.selector.mapping.a = c2 # 如果header中title的值为a,使用c2这个channel
a1.sources.source1.selector.mapping.b = c1 # 如果header中title的值为b,使用c1这个channel
a1.sources.source1.selector.default = c1 # 默认使用c1这个channel

SinkProcessor

SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor

DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。

自定义Interceptor

使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

需求:

在该案例中,我们以端口数据模拟日志,模拟不同类型的日志,我们需要自定义interceptor区分内容是否包含shujia,将其分别发往不同的分析系统(Channel)。

实现代码

package com.shujia.log2flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

/**
 *  1. 如何自定义拦截器?
 *   flume的自定义拦截器需要实现Flume提供的Interceptor接口.
 *
 *  实现抽象方法:
 *      initialize: 完成一些初始化工作.
 *      close: 完成一些善后的工作
 *      intercept:拦截器的核心处理方法.  拦截的逻辑.
 *          intercept(Event event) : 单个event的拦截处理
 *          intercept(List<Event> events): 批次event的拦截处理
 *
 *  2. 拦截器的对象如何实例化?
 *    在拦截器中定义一个static的内部类,实现Flume提供的Builder接口
 *
 *   实现抽象方法:
 *      build : 用于构建拦截器对象
 *      configure:用于读取配置信息(xxxx.conf)
 *
 *
 *
 */
public class LogDataInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    /*
        判断变成event的数据中是否包含shujia字符串,event==>header({}) + body(数据)
        如果包含,给event中的header中添加一个key-value: name/title/key ===  sj
        如果不包含,给event中的header中添加一个key-value: name/title/key ===  nsj
     */
    @Override
    public Event intercept(Event event) {
        //如何取出event中的header和body呢?
        //    Map<String, String> getHeaders();
        //    void setHeaders(Map<String, String> var1);
        //    byte[] getBody();
        Map<String, String> headers = event.getHeaders();
        String body = new String(event.getBody());

        //判断body是否包含shujia
        if(body.contains("shujia")){
            headers.put("title","sj");
        }else {
            headers.put("title","nsj");
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }
    
    public static class MyBuilder implements Builder{

        @Override
        public Interceptor build() {
           return new LogDataInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

引入依赖

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
</dependency>

将代码打成jar包

将jar包放在flume的lib目录下。简单暴力,但是不方便管理

配置文件

1.进阶案例 - channel选择器 - 多路
a3 ==> a3.conf

a3.sources = r1
a3.channels = c1
a3.sinks = k1 

a3.sources.r1.type = avro
a3.sources.r1.bind = node2
a3.sources.r1.port = 6666

a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

a3.sinks.k1.type = logger

a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1 

a2 ==> a2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1

a2.sources.r1.type = avro
a2.sources.r1.bind = node1
a2.sources.r1.port = 5555

a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

a2.sinks.k1.type =logger

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1


a1 ==> a1.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444

#将选择器类型改为multiplexing分发
a1.sources.r1.selector.type = multiplexing
#检测每个event里head的title key
a1.sources.r1.selector.header = title
#如果title的值为at,吧event发到channel c1里,如果为ot,发到channel c2里,如果都不匹配,默认发到c2里
a1.sources.r1.selector.mapping.wt = c1
a1.sources.r1.selector.mapping.n = c2
a1.sources.r1.selector.default=c2
#给拦截器命名i1
a1.sources.r1.interceptors = i1
#这里写自定义类的全类名
a1.sources.r1.interceptors.i1.type = com.shujia.LogDataInterceptor$MyBuilder
# 组装channel与source
a1.sources.r1.channels = c1 c2 



a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666


a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

启动

先启动node1和node2上面的flume

flume-ng agent -n a2 -c ../../flume-1.9.0/conf -f ./a2.conf -Dflume.root.logger=INFO,console
flume-ng agent -n a3 -c ../../flume-1.9.0/conf -f ./a3.conf -Dflume.root.logger=INFO,console

最后启动master上面的flume

flume-ng agent -n a1 -c ../../flume-1.9.0/conf -f ./a3.conf -Dflume.root.logger=INFO,console

标签:Flume,channels,进阶,day03,a1,sources,c1,type,sinks
来源: https://www.cnblogs.com/atao-BigData/p/16387613.html