其他分享
首页 > 其他分享> > 反扒

反扒

作者:互联网

反爬虫项目开发

项目介绍

项目背景

为什么要有反爬虫项目

爬虫程序大量占用我们的系统资源,比如带宽/计算能力等

爬虫程序进行预订/抢票影响我们的正常业务.

导入反爬WEB工程

  1. 创建一个maven工程
  2. 创建一个module
  3. 将反扒参考资料\项目代码\WebProject中的代码拷贝到新建的module中
  4. 运行项目
  5. 报错:
  6. 使用 反扒参考资料\DB\gciantispider.sql 创建数据库的表
    • 如果报getChildLst函数找不到,将SQL中下方的语句删除掉
  7. 启动项目,通过http://localhost:8081访问工程
  8. 用户名root,密码000000

反爬虫项目数据流程走向

逻辑架构

功能描述

数据管理模块

  1. 数据采集
    负责各采集服务器的状态信息展示,比如:服务器名称/当前活跃连接数/最近3天信息采集量
  2. 数据处理
    对查询/预定.国内/国际不同方式的数据解析规则进行配置

流程管理

  1. 对流程管理,负责流程的增/删/改/查/启动流程
  2. 对规则的配置,是否启用规则,规则的标准,规则的打分
  3. 对策略的分值进行控制

非功能性描述

数据的组成:

点击流(信息采集服务器)/业务日志(业务服务器)/第三方对接

数据量级计算方式:

  1. 估算每条数据的大小,(1-2K比较合适),如1.5K
  2. 估算明天数据的条数.如8亿条
  3. 8亿条* 1.5K=每天的数据量约1T左右.
  4. 如果数据全量备份,有3个备份,每天产生3T的数据
  5. 如果每天服务器有100T的磁盘,总共有15台服务器,总共能存储500天的数据
  6. 假如一天有500G数据,7台机器就行了(32核处理器/128G内存/32T硬盘)

峰值数数据量有多大?

将8亿条平均到每秒,峰值每秒20万

500万用户,日活20万,如果每个用户点击30,600万次点击,业务日志6000万,总共数据量6600万

公司集群分类:

数据库ER图

PowerDesigner的使用

  1. 安装:找到安装文件反扒参考资料\PowerDesigner
  2. 新建一个model

  3. 导出SQL文件

防爬规则

数据源

数据源作为反扒的各种指标计算来源,主要包含用户请求携带的各种参数.比如:用户来源的URL,用请求的URL,用户的SessionID.查询相关的出发地/目的地/出发时间

防爬规则

  1. 根据每5分钟查询不同的出发地/目的地
  2. 根据每5分钟查询的时间最小间隔小于2秒钟
  3. 根据每5分钟查询的时间查询的次数大于30次

爬虫程序的特点

  1. 爬虫程序经常在节假日等高峰期进行爬取
  2. 爬虫比较稳定,不区分白天和晚上
  3. 爬虫经常切换出发地/目的地
  4. 爬虫爬取频次比较高
  5. 爬虫携带的浏览路径信息不完整

数据采集

安装Openresty

  1. 到目录反扒参考资料\OpenResty找到openresty-1.13.6.1.tar.gz
  2. 解压 tar -xzvf openresty-1.13.6.1.tar.gz
  3. 配置Openresty

    ./configure --prefix=/usr/local/openresty --with-http_stub_status_module

  4. make && make install
  5. 如果缺少依赖,安装依赖

    yum install readline-devel pcre-devel openssl-devel perl gcc

如果不想自己编译,资料中 反扒参考资料\OpenResty\编译后\openresty是编译好的.直接放入Linux中就可以使用

  1. 进入/export/servers/openresty/nginx目录下
  2. 执行sbin/nginx执行程序(使用chmod u+x nginx赋予执行权限)
  3. 通过浏览器访问:

Lua语法入门

使用方式

交互方式

  1. 使用lua命令启动交互窗口
  2. 输入执行的命令print(“hello world”)

文件方式

  1. 创建lua脚本文件
    vim helloworld.lua
  2. 执行脚本文件
    lua helloworld.lua

数据类型

java中的数据类型

数字相关的:
    整型:
        byte/int/short/long
    浮点型:
        float/double
布尔类型: boolean
字符类型: char

Lua 数据类型

nil:==java中的null
boolean:布尔类型
number:数字类型,不区分整型和浮点型
string:字符串
userdata: C的数据结构
function:函数(方法)
thread:线程
table:集合/数组/Map

Lua运算符

赋值运算符

--赋值运算符
a = 3
b = "hello"
print(a)
print(b)

c,d = 1,2
print(c,d)

e,f = 1,2,3
print(e,f)

g,h = 1
print(g,h)

str = "hello" .. "world"
print(str)

字符串的拼接操作不能使用"+",应该使用".."进行拼接

算术运算符

-- 算术运算符
-- 加减乘除取余
a,b = 10,20

print("a+b=" .. a + b)
print("a-b=" .. a - b)
print("a*b=" .. a * b)
print("a/b=" .. a / b)
print("a%b=" .. a % b)

关系运算符

-- 关系运算符
print("========= 关系运算符 =========")
a,b = 1,2

print("a等于b" .. a == b)
print("a不等于b" .. a ~= b)
print(a > b)
print(a >= b)
print(a < b)
print(a <= b)
  1. 不等于需要~=来表示
  2. 字符串不能和关系运算符进行拼接

逻辑预算符

-- 逻辑运算符
print("========= 逻辑运算符 =========")
a,b = true,false

print(a and b)
print(a or b)
print (not b)

其它运算符

  1. ".."拼接字符串
  2. "#"用来获取字符串或者集合的长度

    --其它运算符
    print("========其它运算符===========")

    str = "hello java"
    print(#str)

Lua流程控制

if条件判断

-- 条件控制
-- if条件判断
a = 10
if a > 5
then
        print("a>5")
end

-- if - else 条件判断
if a > 11
then
        print("a>11")
else
        print("a<=11")
end

-- if 嵌套
if a > 5
then
        print("a>5")
        if a < 12
        then
                print("a<12")
        end
end

循环

while循环

--While循环
print("=======while循环=======")
while a > 0
do
        print(a)
        a = a - 1
end

repeat循环

-- repeat循环
print("=======repeat循环=======")

repeat
        print(a)
        a = a + 1
until a > 10

repeat循环最少执行一次

假如一张纸的厚度为0.04,累计叠多少次才能超过珠穆朗玛峰的高度8847.

for循环

--for循环
print("=======for循环=======")
for b = 10, 1,-1
do
        print(b)
end

for循环后面3个参数

  1. 初始化变量
  2. 循环终止条件(包含等于的条件)
  3. 步长,默认情况下此参数可以省略,默认为1

Lua的数组

--lua的数组
arr = {"zhangsan","lisi","wangwu"}
print(#arr)

for a = 1,#arr
do
        print(arr[a])
end

--使用泛型for循环
-- i是索引
-- name 该索引对应的值
for i,name in ipairs(arr)
do
        print(i,name)
end

注意:

使用ipairs的时候,只能针对于集合/数组

遍历Map数据结构的时候需要使用pairs关键字

Lua的类型转换

其它类型转换为字符串

一般都是将数字转换为字符串

function/table默认不能转为字符串

其它类型转换为数字

-- 其它类型转数字:
-- tonumber()
num = "12"
print(type(num))
print(type(tonumber(num)))
print(tonumber(num))

num = "AF"
print(type(num))
print(type(tonumber(num,16)))
print(tonumber(num,16))

tbl = {"tom","cat"}
print(tonumber(tbl))

boo = false
print(tonumber(boo))

一般非数字格式的都转换不了,比如布尔类型/table/"hello"

Lua的函数

Lua函数定义方式:

函数作用范围 function 函数名字(参数1,参数2...)

        函数体

        return 结果1, 结果2 ...

end

--Lua的函数定义
function f1(a,b)
        print("hello function")
        return a+b
end
result = f1(3,4)
print(result)

--多个返回值
local function f2(a,b)
        return a,b
end
c,d = f2(1,2)
print(c,d)

Lua变量的作用范围

Lua变量默认作用范围是全局的,

加了local关键字之后就变成了局部的,

如果使用全局变量,需要注意变量名不要定义重复了,原来的变量会被替换掉

-- 变量的作用范围
a = 10
if a>3
then
        b = 20
        local c = 30
        print(a)
        print(b)
        print(c)
end
a = "hello"
print(a)
print(b)
print(c)  -- nil

Lua的Table

Lua的table可以代表java中的数组/list/Map类型的数据结构

如果table中是数组格式的数据,遍历的时候应该使用ipairs关键字,如果是Map数据结构,使用paris关键字

--定义一个集合table
local arr = {"zhangsan","lisi","wangwu"}
print(arr[1])
--使用索引遍历table
for i = 1, #arr
do
        print(arr[i])
end
print("========泛型方式遍历=========")
for index, value in ipairs(arr)
do
        print(index, value)
end

print("========Map类型数据结构=========")
map = {name="zhangsan", sex="男", age = 13}
print(map["name"])
print(map.name)
print(map.age)
-- 赋值操作,可以通过"."变量的形式进行赋值或者取值
map.address = "深圳"
print(map.address)

print("========使用循环遍历Map数据结构=========")
for key, value in pairs(map)
do
        print(key, value)
end

Lua的模块

Lua的模块功能依赖于table,先定义一个空的table来存储成员变量或者函数

引用模块的时候使用require关键字,require空格"模块名字"注意不需要".lua"后缀名

模拟向Kafka发送消息

kafka.lua

-- 模拟向Kafka发送消息
_M = {}
--默认分区数量
_M.default_partition_num = 5
function _M.new(props)
        -- 根据传入的props,创建客户端
        return "Kafka client ..."
end
-- 向Kafka发送消息
function _M.send(topic, key, message)
        print("正在向Kafka发送消息,Topic为:"..topic..",消息体为:"..message)
        -- 根据发送结果,返回状态信息,方便做出判断
        return nil,"error"
end

testKafka.lua

-- 模拟测试引入自定义的Kafka模块
require "Kafka"
dpn = _M.default_partition_num
print("默认分区数为:"..dpn)
--创建客户端对象
--需要传入props
props = {{hosts="192.168.80.81", port="9092"},{hosts="192.168.80.81", port="9092"}}
_M.new(props)
--发送消息
ok, err = _M.send("sz07", "1", "向Kafka发送测试消息")
if not ok
then
        --如果结果不正常,打印一下错误信息
        print(err)
        return
end

Lua和Nginx的整合

Lua结合Nginx的2种方式

Lua代码块

location / {
            #root   html;
            #index  index.html index.htm;
            default_type text/html;
            content_by_lua_block{
               #编写lua代码
               print("hello")
               ngx.say("hello openresty")
            }
        }

Lua脚本文件

location / {
            #root   html;
            #index  index.html index.htm;
            default_type text/html;
            content_by_lua_file /export/servers/openresty/test.lua;
        }

content_by_lua_file /export/servers/openresty/test.lua;

最后又一个";"号别忘记写了

Lua获取Http请求参数

获取Get请求参数

-- 使用Lua获取Http请求参数
-- get请求参数的获取
getArgs = ngx.req.get_uri_args()
--获取参数信息
for k,v in pairs(getArgs)
do
        ngx.say("参数名:"..k.." 参数值:"..v)
        ngx.say("<br>")
end

获取Post请求参数

ngx.say("=======获取Post请求参数========")
-- post请求参数的获取
-- 想要读取请求体内容,需要先调用read_body()方法
ngx.req.read_body()

postArgs = ngx.req.get_post_args()
--获取参数信息
for k,v in pairs(postArgs)
do
        ngx.say("参数名:"..k.." 参数值:"..v)
        ngx.say("<br>")
end

凡是涉及到操作请求体的动作,都需要先调用ngx.req.read_body()方法

获取请求头参数

ngx.say("=======获取请求头参数========")

headerArgs = ngx.req.get_headers()
for k,v in pairs(headerArgs)
do
        ngx.say("参数名:"..k.." 参数值:"..v)
        ngx.say("<br>")
end

获取请求体内容(针对于JSON请求参数)

ngx.say("=======获取请求体内容========")
-- 必须先调用read_body()方法
ngx.req.read_body()
bodyData = ngx.req.get_body_data()
-- 因为如果是JSON的请求体内容,没有办法直接遍历,所以直接输出
ngx.say(bodyData)

使用Lua连接MySQL

先引用MySQL模块.位置在:openresty/lualib/resty/mysql.lua

-- 连接MySQL操作
-- 引入MySQL的模块
local restyMysql = require "resty.mysql"
-- Lua调用方法默认用"."就可以了,但如果第一个参数是self,那么可以通过":"来调用,就可以省略掉第一个self参数
local mysql = restyMysql:new()
--设置连接超时时间
mysql:set_timeout(20000)
--开始连接MySQL
--定义连接MySQL的配置
local opts = {}
opts.host = "192.168.80.81"
opts.port = 3306
opts.database = "test"
opts.user = "root"
opts.password = "root"
local ok, err = mysql:connect(opts)
if not ok
then
        ngx.say("连接MySQL失败" .. err)
        return
end
--定义SQL
local sql = "select * from user"
local result, err = mysql:query(sql)
if not result
then
        ngx.say("查询数据失败:" .. err)
        return
end
-- 从查询结果中获取数据
for i,row in ipairs(result)
do
        for key,value in pairs(row)
        do
                ngx.say("列名:"..key.." 值为:" .. value)
        end
        ngx.say("<br>")
end

ngx.say("所有数据打印完毕")

对MySQL进行增删改操作

--新增数据
local sql = "insert into user values('lisi','123','深圳','0','2019-01-01')"
local result, err = mysql:query(sql)
if not result
then
        ngx.say("插入数据失败:" .. err)
        return
end
ngx.say("数据插入成功")

--删除数据
local sql = "delete from user where username='lisi'"
local result, err = mysql:query(sql)
if not result
then
        ngx.say("数据删除失败:" .. err)
        return
end

ngx.say("数据删除成功")
for i,row in ipairs(result)
do
        for key,value in pairs(row)
        do
                ngx.say("列名:"..key.." 值为:" .. value)
        end
        ngx.say("<br>")
end

--修改数据
local sql = "update user set username = 'lisi' where username='zhangsan'"
local result, err = mysql:query(sql)
if not result
then
        ngx.say("数据修改失败:" .. err)
        return
end

ngx.say("数据修改成功")

使用Lua连接Redis

redis单机安装

Redis是基于内存的NoSQL的数据库,里面存储的都是键值对.

如果不想编译安装,可以使用反扒参考资料\Redis\redis-5.0.4直接拷贝到虚拟机中使用.

  1. 将redis-5.0.4拷贝到/export/servers目录下
  2. 进入bin目录,执行chmod u+x ./*命令赋予执行权限

redis.conf配置文件

#绑定的主机地址
bind 0.0.0.0
#绑定的端口号
port 6379
#后台运行,默认情况下,redis服务器独占一个进程窗口
daemonize yes
#redis进程文件所在目录
pidfile /var/run/redis_6379.pid
#redis备份文件
dbfilename dump.rdb
  1. 启动Redis服务端

    ./redis-server redis.conf

  2. 查看redis状态

    ps -ef | grep redis

  3. 连接Redis

    ./redis-cli

Lua连接Redis

--使用Lua连接Redis
--引用Redis的模块
local restyRedis = require "resty.redis"
--调用new方法创建redis客户端
local redis = restyRedis:new()
--设置超时时间
redis:set_timeout(20000)
--创建连接
ok,err = redis:connect("192.168.80.83", 6379)
if not ok
then
        ngx.say("连接失败"..err)
        return
end

-- 连接成功
ok, err = redis:set("username", "zhangsan")
if not ok
then
        ngx.say("设置失败"..err)
        return
end
ngx.say("设置成功")

--获取Redis数据
ok, err = redis:get("username")
if not ok
then
        ngx.say("获取失败"..err)
        return
end
ngx.say(ok)

Redis集群

运行原理

  1. redis各个节点会相互通信,每个节点都会开启2个端口,一个端口用于和客户端通信,一个端口用于内部通信,内部通信端口比客户端端口多10000.
  2. 每个节点分配一定数量的槽,槽的总数量是16384.
  3. 如果有连接需要存入数据,当前连接的Redis节点会先按照一定的算法,得到一个Key的值,比如155533.接下来使用155533%16384,得到的结果看在哪台机器上,
  4. 就将当前的数据存入计算结果对应的机器上.
  5. 如果去集群中取值,一样需要计算,假如数据没有在当前连接的节点上,会将当前的连接重定向到数据所在的节点.

集群搭建

参考反扒参考资料\Redis\Redis集群搭建步骤.md

每个节点的文件夹下面都有一个700x.conf

每个配置文件中都有一些路径相关的配置,所以尽量安装课程去存放,否则需要手动修改路径

7001.conf:

port 7001
dir /export/servers/redis-5.0.4/cluster/7001/data
cluster-enabled yescluster-config-file /export/servers/redis-5.0.4/cluster/7001/data/nodes.conf

启动集群:

bin/redis-server cluster/7001/7001.conf
bin/redis-server cluster/7002/7002.conf
bin/redis-server cluster/7003/7003.conf 
bin/redis-server cluster/7004/7004.conf 
bin/redis-server cluster/7005/7005.conf 
bin/redis-server cluster/7006/7006.conf

通过netstat -nltp查看集群状态

初始化:

如果服务端第一次启动后,直接使用客户端去连接,存入数据,这个时候会报错,报槽没有分配错误

下面的初始化操作,只需要第一次运行的时候执行,以后不需要再重复执行

-- 将下方的192.168.80.81替换为自己的IP地址
bin/redis-cli --cluster create --cluster-replicas 1 你的机器IP:7001 192.168.80.83:7002 192.168.80.83:7003 192.168.80.83:7004 192.168.80.83:7005 192.168.80.83:7006

--cluster-replicas 1指定副本数为1个

连接集群:

bin/redis-cli -c -p 7001
set hello world
get hello

-c 指定我是要连接集群,如果不添加此参数,会造成重定向失败

-p 指定连接的端口号

使用Lua连接Kafka

编写Lua脚本

-- 连接Kafka发送消息
-- 引用Kafka模块
local kafka = require "resty.kafka.producer"
--创建producer
local broker_list = {{host="192.168.80.81",port=9092},{host="192.168.80.82",port=9092},{host="192.168.80.83",port=9092}}
local producer = kafka:new(broker_list)
--发送数据
local ok, err = producer:send("test", "1", "hello openresty")
if not ok
then
        ngx.say("Kafka发送失败"..err)
        return
end

ngx.say("消息发送成功")

启动Kafka集群

  1. 先启动zookeeper

    zkServer.sh start

  2. 启动Kafka

    nohup /export/servers/kafka_2.11-1.0.0/bin/kafka-server-start.sh /export/servers/kafka_2.11-1.0.0/config/server.properties > /dev/null 2>&1 &

/dev/null 指定消息输出的目录

2>&1 将错误消息转换为标准输出

& 后台运行

  1. 显示所有的Topic

    /export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --list

  2. 启动console-consumer

    /export/servers/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic test

编写Lua脚本进行信息采集

修改nginx.conf

http {
    include       mime.types;
    default_type  application/octet-stream;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;
    
    
    #开启共享词典功能, 开启的空间为10Mb大小,因为我们只是存储一些数字,10Mb够用了
    lua_shared_dict shared_data 10m;
    #配置本地域名解析
    resolver 127.0.0.1;
    
    
        server {
                listen       80;
                server_name  localhost;

                #charset koi8-r;

                #access_log  logs/host.access.log  main;

                location / {
                    #root   html;
                    #index  index.html index.htm;
                    
                    
                    #开启 nginx 监控
                    stub_status on;
                    default_type text/html;
                    #content_by_lua_block{
                    #   print("hello")
                    #   ngx.say("hello openresty")
                    #}
                    content_by_lua_file /export/servers/openresty/mylua/controller.lua;
                    
                    
                }

编写controller.lua

--过载保护功能,如果连接超出一定范围,不再进行信息采集
--定义过载的最大值
local maxConnectNum = 10000
--获取当前连接数量
local currentConnect = tonumber(ngx.var.connections_active)
--如果当前连接数大于过载范围,不再进行信息采集
if currentConnect > maxConnectNum
then
        return
end


-- 均衡分区操作
--定义Kafka分区数量
local partition_num = 6
--定义共享词典中的变量名
local sharedKey = "publicValue"
--共享词典操作对象
local shared_data = ngx.shared.shared_data
--从共享词典中取出数据
local num = shared_data:get(sharedKey)
--如果第一运行,num没有值
if not num
then
        --初始化一个值存入共享词典
        num = 0
        shared_data:set(sharedKey, 0)
end

--进行取余操作,确定分区ID
local patitionID = num % partition_num
--调用共享词典自带的自增功能进行累加
shared_data:incr(sharedKey, 1)

-- 数据采集
-- 获取当前系统时间
local time_local = ngx.var.time_local
if time_local == nil then
time_local = "" end
-- 请求的URL
local request = ngx.var.request
if request == nil then
request = "" end
-- 获取请求方式
local request_method = ngx.var.request_method
if request_method == nil then
request_method = "" end
-- 获取请求的内容类型,text/html,application/json
local content_type = ngx.var.content_type
if content_type == nil then
content_type = "" end
-- 读取请求体内容
ngx.req.read_body()
--获取请求体数据
local request_body = ngx.var.request_body
if request_body == nil then
request_body = "" end
-- 获取来源的URL
local http_referer = ngx.var.http_referer
if http_referer == nil then
http_referer = "" end
-- 客户端的IP地址
local remote_addr = ngx.var.remote_addr
if remote_addr == nil then
remote_addr = "" end
-- 获取请求携带的UA信息
local http_user_agent = ngx.var.http_user_agent
if http_user_agent == nil then
http_user_agent = "" end
-- 请求携带的时间
local time_iso8601 = ngx.var.time_iso8601
if time_iso8601 == nil then
time_iso8601 = "" end
-- 请求的IP地址(服务器地址)
local server_addr = ngx.var.server_addr
if server_addr == nil then
server_addr = "" end
--获取用户的Cookie信息
local http_cookie = ngx.var.http_cookie
if http_cookie == nil then
http_cookie = "" end
--封装数据
local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;

-- 连接Kafka,将message发送出去
-- 引用Kafka模块
local kafka = require "resty.kafka.producer"
--创建producer
local broker_list = {{host="192.168.80.81",port=9092},{host="192.168.80.82",port=9092},{host="192.168.80.83",port=9092}}
local producer = kafka:new(broker_list)
--发送数据(主题,key(使用partitionid(0-5)作为key),消息)
local ok, err = producer:send("sz07", tostring(patitionID), message)
if not ok
then
        ngx.say("Kafka发送失败"..err)
        return
end

注意:

分区数量使用Lua无法指定,需要使用kafka脚本手动指定

查看topic操作的帮助
/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --help
修改分区数量为6:
/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --alter --partitions 6 --topic sz07

数据预处理

获取Kafka中的消息

创建工程

导入pom.xml配置文件

导入配置文件

将反扒参考资料\配置文件目录下的文件拷贝到项目resources目录下

修改配置文件中的IP相关配置

导入项目需要的实体类以及工具类

将反扒参考资料\工具包中的类拷贝到项目中

消费Kafka数据的2种方式

链路统计

编写主程序APP

package com.air.antispider.stream.dataprocess

import com.air.antispider.stream.common.util.jedis.PropertiesUtil
import com.air.antispider.stream.dataprocess.businessprocess.BusinessProcess
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * 数据预处理的主程序
  */
object DataProcessApp {

  def main(args: Array[String]): Unit = {

    //创建Spark配置对象
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DataProcessApp")

    //创建SparkStreamingContext对象
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    //消费Kafka消息,有几种方式?2种

    var kafkaParams = Map[String, String]()

    //从kafkaConfig.properties配置文件中获取broker列表信息
    val brokerList: String = PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties")

    kafkaParams += ("metadata.broker.list" -> brokerList)

    val topics = Set[String]("sz07")

    //使用Direct方式从Kafka中消费数据
    //StringDecoder:默认情况下,java的序列化性能不高,Kafka为了提高序列化性能,需要使用kafka自己的序列化机制
    val inputDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    //获取的消息是(key,message)的形式,
    val messageDStream: DStream[String] = inputDStream.map(_._2)

    messageDStream.foreachRDD(messageRDD =>{
      //开启链路统计功能
      BusinessProcess.linkCount(messageRDD)
      messageRDD.foreach(println)
    })

    //启动Spark程序
    ssc.start()
    ssc.awaitTermination()
  }
}

链路统计代码

package com.air.antispider.stream.dataprocess.businessprocess

import java.util.Date

import com.air.antispider.stream.common.util.jedis.{JedisConnectionUtil, PropertiesUtil}
import org.apache.spark.rdd.RDD
import org.json4s.DefaultFormats
import org.json4s.jackson.Json
import redis.clients.jedis.JedisCluster

/**
  * 链路统计功能
  */
object BusinessProcess {
  def linkCount(messageRDD: RDD[String]) = {
    //信息采集量
    val serverCountRDD: RDD[(String, Int)] = messageRDD.map(message => {
      val arr: Array[String] = message.split("#CS#")
      if (arr.length > 9) {
        //有数据
        val serverIP = arr(9)
        //(ip,1次)
        (serverIP, 1)
      } else {
        ("", 1)
      }
    })
      //按照Key进行累加操作
      .reduceByKey(_ + _)

    //当前活跃连接数
    val activeNumRDD: RDD[(String, Int)] = messageRDD.map(message => {
      val arr: Array[String] = message.split("#CS#")
      if (arr.length > 11) {
        //取IP
        val serverIP = arr(9)
        //取本IP的活跃连接数量
        val activeNum = arr(11)
        //(ip,1次)
        (serverIP, activeNum.toInt)
      } else {
        ("", 1)
      }
    })
      //舍弃一个值,主需要一个活跃连接数就ok了
      .reduceByKey((x, y) => y)

    //进行数据展示
    //通过跟踪java代码,发现我们需要封装一个json数据,存入Redis中,让前端进行数据展示
    if (!serverCountRDD.isEmpty() && !activeNumRDD.isEmpty()) {
      //如果数据不为空,开始数据处理
      //将RDD的结果转换为Map
      val serversCountMap: collection.Map[String, Int] = serverCountRDD.collectAsMap()
      val activeNumMap: collection.Map[String, Int] = activeNumRDD.collectAsMap()

      val map = Map[String, collection.Map[String, Int]](
        "serversCountMap" -> serversCountMap,
        "activeNumMap" -> activeNumMap
      )

      //将map转换为JSON
      val jsonData: String = Json(DefaultFormats).write(map)

      //将jsonData存入Redis中
      //获取Redis连接
      val jedis: JedisCluster = JedisConnectionUtil.getJedisCluster
      //存入数据
      //使用CSANTI_MONITOR_LP + 时间戳   格式来作为Key
      val key: String = PropertiesUtil.getStringByKey("cluster.key.monitor.linkProcess", "jedisConfig.properties") + new Date().getTime
      val ex: Int = PropertiesUtil.getStringByKey("cluster.exptime.monitor", "jedisConfig.properties").toInt
      //当前数据是以天为单位进行存储的,所以有效时间,设置为1天就行了
//      jedis.set(key, jsonData)
      //设置超时时间为2分钟
      jedis.setex(key, ex, jsonData)
    }
  }
}

URL过滤

流程:

1. 先获取MySQL数据库中的URL过滤规则
  1. 将过滤规则放入广播变量
    1. 根据Redis的状态更新广播变量
      1. 使用过滤规则的广播变量实现过滤功能

代码编写

  1. 先获取MySQL数据库中的URL过滤规则

代码:

import com.air.antispider.stream.common.util.database.QueryDB

import scala.collection.mutable.ArrayBuffer

/**
  * 加载MySQL中的规则,方便Spark进行计算
  */
object AnalyzeRuleDB {

  /**
    * 获取MySQL中的URL过滤规则
    */
  def getFilterRule(): ArrayBuffer[String]  = {
    val sql = "select value from nh_filter_rule"
    val field = "value"
    //查询数据库的value列
    val filterRule: ArrayBuffer[String] = QueryDB.queryData(sql, field)
    filterRule
  }
}
  1. 将过滤规则放入广播变量

在创建SparkContext之后,获取Kafka数据之前,加载数据库的信息,放入广播变量

 //加载数据库规则,放入广播变量
    val filterRuleList: ArrayBuffer[String] = AnalyzeRuleDB.getFilterRule()
    //将过滤规则列表放入广播变量
    //@volatile 让多个线程能够安全的修改广播变量
    @volatile var filterRuleBroadcast: Broadcast[ArrayBuffer[String]] = sc.broadcast(filterRuleList)

@volatile注解:

  1. 更新广播变量

    //先检查数据库,更新广播变量
    var filterRuleChangeFlag = jedis.get("FilterRuleChangeFlag")
    //检查标记是否存在
    if (StringUtils.isBlank(filterRuleChangeFlag)) {
    filterRuleChangeFlag = "true"
    //重新设置到Redis中
    jedis.set("FilterRuleChangeFlag", filterRuleChangeFlag)
    }
    //更新广播变量
    if (filterRuleChangeFlag.toBoolean) {
    //FilterRuleChangeFlag为true,代表需要重新更新广播变量
    //加载数据库规则,放入广播变量
    val filterRuleList: ArrayBuffer[String] = AnalyzeRuleDB.getFilterRule()
    //将过滤规则列表放入广播变量
    //@volatile 让多个线程能够安全的修改广播变量
    filterRuleBroadcast = sc.broadcast(filterRuleList)
    filterRuleChangeFlag = "false"
    jedis.set("FilterRuleChangeFlag", filterRuleChangeFlag)
    }

  2. 创建URLFilter过滤类

    import scala.collection.mutable.ArrayBuffer

    /**
    • 使用广播变量,实现URL过滤功能
      */
      object URLFilter {
      /**
      • @param message 原始数据
      • @param filterRulList 过滤规则
        */
        def filterURL(message: String, filterRulList: ArrayBuffer[String]): Boolean = {
        //看当前的message是否匹配filterRuleList
        //先取出message中的URL
        var url = ""
        val arr: Array[String] = message.split("#CS#")
        if (arr.length > 1) {
        val arrTemp: Array[String] = arr(1).split(" ")
        if (arrTemp.length > 1) {
        url = arrTemp(1)
        }
        }
        //判断是否能取出URL,
        if (StringUtils.isBlank(url)) {
        return false
        }
        //遍历filterRulList
        for (filterRule <- filterRulList) {
        if (url.matches(filterRule)) {
        return false
        }
        }
        //如果整个集合都遍历完了,还没有return,那肯定是没有一个能匹配上
        return true
        }
        }
  3. 在主程序中引用URLFilter过滤类

    //URL过滤功能
    val filterRDD: RDD[String] = messageRDD.filter(message => URLFilter.filterURL(message, filterRuleBroadcast.value))

数据加密操作

代码编写:

package com.air.antispider.stream.dataprocess.businessprocess

import java.util.regex.{Matcher, Pattern}
import com.air.antispider.stream.common.util.decode.MD5
import org.apache.spark.rdd.RDD

/**
  * 对用户的敏感信息进行加密操作
  */
object EncryptedData {
  /**
    * 加密身份证号
    * @param encryptedPhoneRDD
    * @return
    */
  def encryptedID(encryptedPhoneRDD: RDD[String]): RDD[String] = {
    //如何找到手机号
    encryptedPhoneRDD.map(message => {
      //创建加密对象
      val md5 = new MD5
      //找message中的手机号
      //可以使用正则表达式来找
      val pattern: Pattern = Pattern.compile("(\\d{18})|(\\d{17}(\\d|X|x))|(\\d{15})")
      //使用正则对象,对message进行匹配,matcher是匹配结果
      val matcher: Matcher = pattern.matcher(message)
      var tempMessage = message
      //      while (iterator.hasNext()) {
      //        iterator.next()
      //      }
      //循环结果,看有没有匹配到的数据
      while (matcher.find()) {
        //取出匹配结果
        val id: String = matcher.group()
        //加密/替换
        val encryptedID: String = md5.getMD5ofStr(id)
        tempMessage = tempMessage.replace(id, encryptedID)
      }
      //返回加密之后的数据
      tempMessage
    })
  }

  //手机号加密
  def encryptedPhone(filterRDD: RDD[String]): RDD[String] = {

    //如何找到手机号
    filterRDD.map(message => {
      //创建加密对象
      val md5 = new MD5
      //找message中的手机号
      //可以使用正则表达式来找
      val pattern: Pattern = Pattern.compile("((13[0-9])|(14[5|7])|(15([0-3]|[5-9]))|(17[0-9])|(18[0,5-9]))\\d{8}")
      //使用正则对象,对message进行匹配,matcher是匹配结果
      val matcher: Matcher = pattern.matcher(message)
      var tempMessage = message
//      while (iterator.hasNext()) {
//        iterator.next()
//      }
      //循环结果,看有没有匹配到的数据
      while (matcher.find()) {
        //取出匹配结果
        val phone: String = matcher.group()
        //加密/替换
        val encryptedPhone: String = md5.getMD5ofStr(phone)
        tempMessage = tempMessage.replace(phone, encryptedPhone)
      }
      //返回加密之后的数据
      tempMessage
    })
  }
}

主程序:

  //进行数据脱敏操作
      //加密手机号
      val encryptedPhoneRDD: RDD[String] = EncryptedData.encryptedPhone(filterRDD)
      //加密身份证号
      val encryptedRDD: RDD[String] = EncryptedData.encryptedID(encryptedPhoneRDD)

数据切割操作

代码:

package com.air.antispider.stream.dataprocess.businessprocess

import java.util.regex.Pattern

import com.air.antispider.stream.common.util.decode.{EscapeToolBox, RequestDecoder}
import com.air.antispider.stream.common.util.jedis.PropertiesUtil
import com.air.antispider.stream.common.util.string.CsairStringUtils

/**
  * 数据切割主程序
  */
object DataSplit {

  /**
    * 将源数据进行切割,得到具体的参数
    * @param message
    * @return
    */
  def split(message: String):(String,String,String,String,String,String,String,String,String,String,String,String ) = {
    val values: Array[String] = message.split("#CS#")
    //从arr中取出这12个参数,进行赋值操作
    //记录数据长度
    val valuesLength = values.length
    //request 原始数据
    val regionalRequest = if (valuesLength > 1) values(1) else ""
    //分割出 request 中的 url
    val request = if (regionalRequest.split(" ").length > 1) {
      regionalRequest.split(" ")(1)
    } else { ""
    }
    //请求方式 GET/POST
    val requestMethod = if (valuesLength > 2) values(2) else ""
    //content_type
    val contentType = if (valuesLength > 3) values(3) else ""
    //Post 提交的数据体
    val requestBody = if (valuesLength > 4) values(4) else ""
    //http_referrer
    val httpReferrer = if (valuesLength > 5) values(5) else ""
    //客户端 IP
    val remoteAddr = if (valuesLength > 6) values(6) else ""
    //客户端 UA
    val httpUserAgent = if (valuesLength > 7) values(7) else ""
    //服务器时间的 ISO8610 格式
    val timeIso8601 = if (valuesLength > 8) values(8) else ""
    //服务器地址
    val serverAddr = if (valuesLength > 9) values(9) else ""
    //Cookie 信息
    //原始信息中获取 Cookie 字符串,去掉空格,制表符
    val cookiesStr = CsairStringUtils.trimSpacesChars(if (valuesLength > 10) values(10) else "")
    //提取 Cookie 信息并保存为 K-V 形式
    val cookieMap = {
      var tempMap = new scala.collection.mutable.HashMap[String, String]
      if (!cookiesStr.equals("")) {
        cookiesStr.split(";").foreach { s =>
          val kv = s.split("=")
          //UTF8 解码
          if (kv.length > 1) {
            try {
              val chPattern = Pattern.compile("u([0-9a-fA-F]{4})")
              val chMatcher = chPattern.matcher(kv(1))
              var isUnicode = false
              while (chMatcher.find()) {
                isUnicode = true
              }
              if (isUnicode) {
                tempMap += (kv(0) -> EscapeToolBox.unescape(kv(1)))
              } else {
                tempMap += (kv(0) -> RequestDecoder.decodePostRequest(kv(1)))
              }
            } catch {
              case e: Exception => e.printStackTrace()
            }
          }
        }
      }
      tempMap
    }
    //Cookie 关键信息解析
    //从配置文件读取 Cookie 配置信息
    val cookieKey_JSESSIONID = PropertiesUtil.getStringByKey("cookie.JSESSIONID.key", "cookieConfig.properties")
    val cookieKey_userId4logCookie = PropertiesUtil.getStringByKey("cookie.userId.key", "cookieConfig.properties")
    //Cookie-JSESSIONID
    val cookieValue_JSESSIONID = cookieMap.getOrElse(cookieKey_JSESSIONID, "NULL")
    //Cookie-USERID-用户 ID
    val cookieValue_USERID = cookieMap.getOrElse(cookieKey_userId4logCookie, "NULL")
    (request,requestMethod,contentType,requestBody,httpReferrer,remoteAddr,httpUserAgent,timeIso8601,serverAddr,cookiesStr,cookieValue_JSESSIONID,cookieValue_USERID)
  }

}

主程序:

encryptedRDD.map(message => {
        //获取到消息后开始进行数据切割/打标签等操作
        //数据切割
        val (request,
        requestMethod,
        contentType,
        requestBody,
        httpReferrer,
        remoteAddr,
        httpUserAgent,
        timeIso8601,
        serverAddr,
        cookiesStr,
        cookieValue_JSESSIONID,
        cookieValue_USERID) = DataSplit.split(message)
    
      })

数据打标签

为了方便后面的业务进行数据解析操作,必须知道当前的信息是一个什么样的请求,比如是国内/查询/单程,还是国际/查询/往返,

分类打标签

  1. 去数据库中查询分类规则信息

    /**
    * 查询标签规则的数据
    */
    def getClassifyRule(): Map[String, ArrayBuffer[String]] = {
    //获取"国内查询"的所有URL
    val nationalQuerySQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.National.id + " and operation_type = " + BehaviorTypeEnum.Query.id
    val nationalQueryList: ArrayBuffer[String] = QueryDB.queryData(nationalQuerySQL, "expression")
    //获取"国内预定"的所有URL
    val nationalBookSQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.National.id + " and operation_type = " + BehaviorTypeEnum.Book.id
    val nationalBookList: ArrayBuffer[String] = QueryDB.queryData(nationalBookSQL, "expression")
    //获取"国际查询"的所有URL
    val internationalQuerySQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.International.id + " and operation_type = " + BehaviorTypeEnum.Query.id
    val internationalQueryList: ArrayBuffer[String] = QueryDB.queryData(internationalQuerySQL, "expression")
    //获取"国际预定"的所有URL
    val internationalBookSQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.International.id + " and operation_type = " + BehaviorTypeEnum.Book.id
    val internationalBookList: ArrayBuffer[String] = QueryDB.queryData(internationalBookSQL, "expression")

     //定义一个Map,用来封装上面的4个集合
    
     val map = Map[String, ArrayBuffer[String]](
       "nationalQueryList" -> nationalQueryList,
       "nationalBookList" -> nationalBookList,
       "internationalQueryList" -> internationalQueryList,
       "internationalBookList" -> internationalBookList
     )
     map

    }

  2. 加载分类规则到广播变量

     //将分类规则加载到广播变量
     val classifyRuleMap: Map[String, ArrayBuffer[String]] = AnalyzeRuleDB.getClassifyRule()
     @volatile var classifyRuleBroadcast: Broadcast[Map[String, ArrayBuffer[String]]] = sc.broadcast(classifyRuleMap)
  3. 更新广播变量

    //更新分类规则信息
    var classifyRuleChangeFlag: String = jedis.get("ClassifyRuleChangeFlag")
    //先判断classifyRuleChangeFlag是否为空
    if (StringUtils.isBlank(classifyRuleChangeFlag)){
    classifyRuleChangeFlag = "true"
    //重新设置到Redis中
    jedis.set("ClassifyRuleChangeFlag", classifyRuleChangeFlag)
    }
    if (classifyRuleChangeFlag.toBoolean) {
    classifyRuleBroadcast.unpersist()
    //将分类规则加载到广播变量
    val classifyRuleMap: Map[String, ArrayBuffer[String]] = AnalyzeRuleDB.getClassifyRule()
    classifyRuleBroadcast = sc.broadcast(classifyRuleMap)
    classifyRuleChangeFlag = "false"
    //重新设置到Redis中
    jedis.set("ClassifyRuleChangeFlag", classifyRuleChangeFlag)
    }

  4. 根据广播变量中的规则对当前请求打标签

    package com.air.antispider.stream.dataprocess.businessprocess

    import com.air.antispider.stream.common.bean.RequestType
    import com.air.antispider.stream.dataprocess.constants.{BehaviorTypeEnum, FlightTypeEnum}
    import com.air.antispider.stream.dataprocess.constants.FlightTypeEnum.FlightTypeEnum

    import scala.collection.mutable.ArrayBuffer

    object RequestTypeClassifier {
    /**
    * 对请求的分类进行判断
    * @param request
    * @param classifyRuleMap
    * @return 用户的请求分类信息(国内,查询)
    */
    def requestTypeClassifier(request: String, classifyRuleMap: Map[String, ArrayBuffer[String]]): RequestType = {
    //取出分类集合中的数据
    val nationalQueryList: ArrayBuffer[String] = classifyRuleMap.getOrElse("nationalQueryList", null)
    val nationalBookList: ArrayBuffer[String] = classifyRuleMap.getOrElse("nationalBookList", null)
    val internationalQueryList: ArrayBuffer[String] = classifyRuleMap.getOrElse("internationalQueryList", null)
    val internationalBookList: ArrayBuffer[String] = classifyRuleMap.getOrElse("internationalBookList", null)

     //变量这4个集合,看当前的request在哪个集合中匹配
     //国内查询
     if (nationalQueryList != null) {
       //      fira code
       for (expression <- nationalQueryList) {
         //判断当前请求的URL是否和本正则匹配
         if (request.matches(expression)) {
           return RequestType(FlightTypeEnum.National, BehaviorTypeEnum.Query)
         }
       }
     }
     //国内预定
     if (nationalBookList != null) {
       //      fira code
       for (expression <- nationalBookList) {
         //判断当前请求的URL是否和本正则匹配
         if (request.matches(expression)) {
           return RequestType(FlightTypeEnum.National, BehaviorTypeEnum.Book)
         }
       }
     }
     //国际查询
     if (internationalQueryList != null) {
       //      fira code
       for (expression <- internationalQueryList) {
         //判断当前请求的URL是否和本正则匹配
         if (request.matches(expression)) {
           return RequestType(FlightTypeEnum.International, BehaviorTypeEnum.Query)
         }
       }
     }
     //国际预定
     if (internationalBookList != null) {
       //      fira code
       for (expression <- internationalBookList) {
         //判断当前请求的URL是否和本正则匹配
         if (request.matches(expression)) {
           return RequestType(FlightTypeEnum.International, BehaviorTypeEnum.Book)
         }
       }
     }
    
     //如果上面没有任何一个匹配上,那么返回一个默认值
     return RequestType(FlightTypeEnum.Other, BehaviorTypeEnum.Other)

    }
    }

5.在主程序中引用打标签的方法

//对请求的分类进行打标签操作
        val requestType: RequestType = RequestTypeClassifier.requestTypeClassifier(request, classifyRuleBroadcast.value)

往返类型打标签

用户请求信息中没有携带往返类型信息,我们需要需要用HttpReferrer中获取日期数量来判断往返类型,如果日期个数为1,单程.如果个数为2,往返

编写代码:

package com.air.antispider.stream.dataprocess.businessprocess

import java.util.regex.{Matcher, Pattern}

import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum
import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum

/**
  * 往返信息打标签
  */
object TravelTypeClassifier {

  def travelTypeClassifier(httpReferrer: String): TravelTypeEnum = {
    val pattern: Pattern = Pattern.compile("(\\d{4})-(0\\d{1}|1[0-2])-(0\\d{1}|[12]\\d{1}|3[01])")
    val matcher: Matcher = pattern.matcher(httpReferrer)

    //创建一个计数器
    var num = 0

    //调用find方法的时候,游标会自动向下
    while (matcher.find()) {
      num = num + 1
    }

    if (num == 1) {
      //是单程
      return TravelTypeEnum.OneWay
    } else if (num == 2) {
      //是往返
      return TravelTypeEnum.RoundTrip
    } else {
      //不知道啊
      return TravelTypeEnum.Unknown
    }
  }
}

主程序:

//对往返数据进行打标签操作
        val travelTypeEnum: TravelTypeEnum = TravelTypeClassifier.travelTypeClassifier(httpReferrer)

数据解析操作

因为先有南航系统,因为系统开发久远,各个模块请求参数不统一或者请求格式不统一,我们根据航线类型/操作类型/请求的URL/请求方式等信息,通过查询数据库中的analyzerule表信息,获取解析规则,通过数据库配置好的解析规则来进行数据解析,

此处主要确定2个内容:1. 解析方式,比如使用json解析还是使用XML方式解析. 2. 确定需要解析哪些字段

加载数据库解析规则

从反扒参考资料\工具包\解析类\AnalyzeRuleDB.scala中找到获取解析规则的方法:queryRule

/**
    * 查询"查询规则"或者“预定规则”正则表达式,添加到广播变量
    *
    * @return
    */
  def queryRule(behaviorType: Int): List[AnalyzeRule] = {
    //mysql中解析规则(0-查询,1-预订)数据
    var analyzeRuleList = new ArrayBuffer[AnalyzeRule]()
    val sql: String = "select * from analyzerule where behavior_type =" + behaviorType
    var conn: Connection = null
    var ps: PreparedStatement = null
    var rs: ResultSet = null
    try {
      conn = c3p0Util.getConnection
      ps = conn.prepareStatement(sql)
      rs = ps.executeQuery()
      while (rs.next()) {
        val analyzeRule = new AnalyzeRule()
        analyzeRule.id = rs.getString("id")
        analyzeRule.flightType = rs.getString("flight_type").toInt
        analyzeRule.BehaviorType = rs.getString("behavior_type").toInt
        analyzeRule.requestMatchExpression = rs.getString("requestMatchExpression")
        analyzeRule.requestMethod = rs.getString("requestMethod")
        analyzeRule.isNormalGet = rs.getString("isNormalGet").toBoolean
        analyzeRule.isNormalForm = rs.getString("isNormalForm").toBoolean
        analyzeRule.isApplicationJson = rs.getString("isApplicationJson").toBoolean
        analyzeRule.isTextXml = rs.getString("isTextXml").toBoolean
        analyzeRule.isJson = rs.getString("isJson").toBoolean
        analyzeRule.isXML = rs.getString("isXML").toBoolean
        analyzeRule.formDataField = rs.getString("formDataField")
        analyzeRule.book_bookUserId = rs.getString("book_bookUserId")
        analyzeRule.book_bookUnUserId = rs.getString("book_bookUnUserId")
        analyzeRule.book_psgName = rs.getString("book_psgName")
        analyzeRule.book_psgType = rs.getString("book_psgType")
        analyzeRule.book_idType = rs.getString("book_idType")
        analyzeRule.book_idCard = rs.getString("book_idCard")
        analyzeRule.book_contractName = rs.getString("book_contractName")
        analyzeRule.book_contractPhone = rs.getString("book_contractPhone")
        analyzeRule.book_depCity = rs.getString("book_depCity")
        analyzeRule.book_arrCity = rs.getString("book_arrCity")
        analyzeRule.book_flightDate = rs.getString("book_flightDate")
        analyzeRule.book_cabin = rs.getString("book_cabin")
        analyzeRule.book_flightNo = rs.getString("book_flightNo")
        analyzeRule.query_depCity = rs.getString("query_depCity")
        analyzeRule.query_arrCity = rs.getString("query_arrCity")
        analyzeRule.query_flightDate = rs.getString("query_flightDate")
        analyzeRule.query_adultNum = rs.getString("query_adultNum")
        analyzeRule.query_childNum = rs.getString("query_childNum")
        analyzeRule.query_infantNum = rs.getString("query_infantNum")
        analyzeRule.query_country = rs.getString("query_country")
        analyzeRule.query_travelType = rs.getString("query_travelType")
        analyzeRule.book_psgFirName = rs.getString("book_psgFirName")
        analyzeRuleList += analyzeRule
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      c3p0Util.close(conn, ps, rs)
    }
    analyzeRuleList.toList
  }

将规则放入广播变量

    //加载解析规则信息到广播变量
    val queryRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(0)
    val bookRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(1)
    @volatile var queryRuleBroadcast: Broadcast[List[AnalyzeRule]] = sc.broadcast(queryRuleList)
    @volatile var bookRuleBroadcast: Broadcast[List[AnalyzeRule]] = sc.broadcast(bookRuleList)

更新广播变量

      //更新解析规则信息
      var analyzeRuleChangeFlag: String = jedis.get("AnalyzeRuleChangeFlag")
      //先判断classifyRuleChangeFlag是否为空
      if (StringUtils.isBlank(analyzeRuleChangeFlag)){
        analyzeRuleChangeFlag = "true"
        //重新设置到Redis中
        jedis.set("AnalyzeRuleChangeFlag", analyzeRuleChangeFlag)
      }
      if (analyzeRuleChangeFlag.toBoolean) {
        queryRuleBroadcast.unpersist()
        bookRuleBroadcast.unpersist()
        //将解析规则加载到广播变量
        //加载解析规则信息到广播变量
        val queryRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(0)
        val bookRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(1)
        queryRuleBroadcast = sc.broadcast(queryRuleList)
        bookRuleBroadcast = sc.broadcast(bookRuleList)

        analyzeRuleChangeFlag = "false"
        //重新设置到Redis中
        jedis.set("AnalyzeRuleChangeFlag", analyzeRuleChangeFlag)
      }

编写解析规则代码

将反扒参考资料\工具包\解析类路径下的AnalyzeBookRequest和AnalyzeRequest2个类拷贝到com.air.antispider.stream.dataprocess.businessprocess包下

主程序调用

 //开始解析数据
        //解析查询数据
        val queryParams: Option[QueryRequestData] = AnalyzeRequest.analyzeQueryRequest(
          requestType,
          requestMethod,
          contentType,
          request,
          requestBody,
          travelTypeEnum,
          queryRuleBroadcast.value)
        //解析预定数据
        val bookParams: Option[BookRequestData] = AnalyzeBookRequest.analyzeBookRequest(
          requestType,
          requestMethod,
          contentType,
          request,
          requestBody,
          travelTypeEnum,
          bookRuleBroadcast.value
        )

数据加工

提前将本次访问的IP和MySQL中的黑名单数据进行比对,判断当前的IP是否是一个高频IP,如果是高频IP,那么就打个标记,让后续业务使用.

  1. 加载MySQL中的黑名单数据

    /**
    * 查询MySQL数据库中的黑名单数据
    * @return
    */
    def getIpBlackList (): ArrayBuffer[String] = {
    val sql = "select ip_name from nh_ip_blacklist"
    val blackIPList: ArrayBuffer[String] = QueryDB.queryData(sql, "ip_name")
    blackIPList
    }

  2. 将黑名单数据放入广播变量

     //将黑名单数据加载到广播变量
     val blackIPList: ArrayBuffer[String] = AnalyzeRuleDB.getIpBlackList()
     @volatile var blackIPBroadcast: Broadcast[ArrayBuffer[String]] = sc.broadcast(blackIPList)
  3. 更新广播变量的黑名单数据

       //更新黑名单信息
       var blackIPChangeFlag: String = jedis.get("BlackIPChangeFlag")
       //先判断classifyRuleChangeFlag是否为空
       if (StringUtils.isBlank(blackIPChangeFlag)){
         blackIPChangeFlag = "true"
         //重新设置到Redis中
         jedis.set("BlackIPChangeFlag", blackIPChangeFlag)
       }
       if (blackIPChangeFlag.toBoolean) {
         blackIPBroadcast.unpersist()
         //将黑名单数据加载到广播变量
         val blackIPList: ArrayBuffer[String] = AnalyzeRuleDB.getIpBlackList()
         blackIPBroadcast = sc.broadcast(blackIPList)
    
         blackIPChangeFlag = "false"
         //重新设置到Redis中
         jedis.set("BlackIPChangeFlag", blackIPChangeFlag)
       }
  4. 编写判断高频IP代码

    package com.air.antispider.stream.dataprocess.businessprocess

    import scala.collection.mutable.ArrayBuffer

    object IpOperation {
    /**
    * 判断当前客户端IP是否是高频IP
    * @param remoteAddr
    * @param blackIPList
    * @return
    */
    def operationIP(remoteAddr: String, blackIPList: ArrayBuffer[String]): Boolean = {
    //遍历blackIPList,判断remoteAddr在集合中是否存在
    for (blackIP <- blackIPList) {
    if (blackIP.equals(remoteAddr)){
    //如果相等,当前IP是高频IP
    return true
    }
    }
    return false
    }
    }

  5. 主程序代码

    //数据加工操作
    val highFrqIPGroup: Boolean = IpOperation.operationIP(remoteAddr, blackIPBroadcast.value)

数据结构化

前面获取/计算的数据都是散乱的,没办法交给后面的业务进行处理,所以需要进行封装为结构化数据.

代码编写:

package com.air.antispider.stream.dataprocess.businessprocess

import com.air.antispider.stream.common.bean.{BookRequestData, CoreRequestParams, ProcessedData, QueryRequestData, RequestType}
import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum

object DataPackage {
  /**
    * 对下方散乱的数据,进行封装,封装为ProcessedData
    * @param source
    * @param requestMethod
    * @param request
    * @param remoteAddr
    * @param httpUserAgent
    * @param timeIso8601
    * @param serverAddr
    * @param highFrqIPGroup
    * @param requestType
    * @param travelType
    * @param cookieValue_JSESSIONID
    * @param cookieValue_USERID
    * @param queryParams
    * @param bookParams
    * @param httpReferrer
    * @return
    */
  def dataPackage(sourceData: String,
                  requestMethod: String,
                  request: String,
                  remoteAddr: String,
                  httpUserAgent: String,
                  timeIso8601: String,
                  serverAddr: String,
                  highFrqIPGroup: Boolean,
                  requestType: RequestType,
                  travelType: TravelTypeEnum,
                  cookieValue_JSESSIONID: String,
                  cookieValue_USERID: String,
                  queryParams: Option[QueryRequestData],
                  bookParams: Option[BookRequestData],
                  httpReferrer: String): ProcessedData = {

    //因为创建ProcessedData的时候,还需要核心请求参数,
    //但这些参数在queryParams/bookParams中

    //定义出发时间/始发地/目的地等参数
    var flightDate: String = ""
    //出发地
    var depcity: String = ""
    //目的地
    var arrcity: String = ""

   //看查询请求参数中有没有值
    queryParams match {
      //Option有值的情况,queryData:如果有值,就使用此变量操作
      case Some(x) =>
        flightDate = x.flightDate
        depcity = x.depCity
        arrcity = x.arrCity
      //None:没有值
      case None =>
        //如果查询请求参数没有值,就去预定请求参数中获取
        bookParams match {
          //Option有值的情况,queryData:如果有值,就使用此变量操作
          case Some(bookData) =>
            //为了确保安全,需要加上长度判断,只有长度大于0才能这样取值
            flightDate = bookData.flightDate.mkString
            depcity = bookData.depCity.mkString
            arrcity = bookData.arrCity.mkString
          //None:没有值
          case None =>
        }
    }

    //创建核心请求参数
    val requestParams = CoreRequestParams(flightDate, depcity, arrcity)

    ProcessedData(
      sourceData,
      requestMethod,
      request,
      remoteAddr,
      httpUserAgent,
      timeIso8601,
      serverAddr,
      highFrqIPGroup,
      requestType,
      travelType,
      requestParams,
      cookieValue_JSESSIONID,
      cookieValue_USERID,
      queryParams,
      bookParams,
      httpReferrer)
  }
}

主程序代码

//进行数据信息提取/转换等操作,得到ProcessedDataRDD
      val processedDataRDD: RDD[ProcessedData] = encryptedRDD.map(message => {
        //获取到消息后开始进行数据切割/打标签等操作
        //数据切割
        val (request, //请求URL
        requestMethod,
        contentType,
        requestBody, //请求体
        httpReferrer, //来源URL
        remoteAddr, //客户端IP
        httpUserAgent,
        timeIso8601,
        serverAddr,
        cookiesStr,
        cookieValue_JSESSIONID,
        cookieValue_USERID) = DataSplit.split(message)

        //对请求的分类进行打标签操作
        val requestType: RequestType = RequestTypeClassifier.requestTypeClassifier(request, classifyRuleBroadcast.value)
        //对往返数据进行打标签操作
        val travelType: TravelTypeEnum = TravelTypeClassifier.travelTypeClassifier(httpReferrer)
        //开始解析数据
        //解析查询数据
        val queryParams: Option[QueryRequestData] = AnalyzeRequest.analyzeQueryRequest(
          requestType,
          requestMethod,
          contentType,
          request,
          requestBody,
          travelType,
          queryRuleBroadcast.value)
        //解析预定数据
        val bookParams: Option[BookRequestData] = AnalyzeBookRequest.analyzeBookRequest(
          requestType,
          requestMethod,
          contentType,
          request,
          requestBody,
          travelType,
          bookRuleBroadcast.value
        )
        //数据加工操作
        val highFrqIPGroup: Boolean = IpOperation.operationIP(remoteAddr, blackIPBroadcast.value)
        //对上面的散乱数据进行封装
        val processedData: ProcessedData = DataPackage.dataPackage(
          "", //原始数据,此处直接置为空
          requestMethod,
          request,
          remoteAddr,
          httpUserAgent,
          timeIso8601,
          serverAddr,
          highFrqIPGroup,
          requestType,
          travelType,
          cookieValue_JSESSIONID,
          cookieValue_USERID,
          queryParams,
          bookParams,
          httpReferrer)
        processedData
      })

数据推送模块

为了实现更好的解耦,在数据推送的时候,会根据请求具体的类型,比如查询/预定,发送到不同的Topic.后面的业务,就很近自己的需要去拉取自己的消息

代码编写:

package com.air.antispider.stream.dataprocess.businessprocess

import com.air.antispider.stream.common.bean.ProcessedData
import com.air.antispider.stream.common.util.jedis.PropertiesUtil
import com.air.antispider.stream.dataprocess.constants.BehaviorTypeEnum
import com.air.antispider.stream.dataprocess.constants.BehaviorTypeEnum.BehaviorTypeEnum
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.rdd.RDD

object SendData {
  /**
    * 发送预定数据到Kafka
    *
    * @param processedDataRDD
    */
  def sendBookDataKafka(processedDataRDD: RDD[ProcessedData]) = {
    sendToKafka(processedDataRDD, 1)
  }

  /**
    * 发送查询数据到Kafka
    *
    * @param processedDataRDD
    */
  def sendQueryDataKafka(processedDataRDD: RDD[ProcessedData]) = {
    sendToKafka(processedDataRDD, 0)
  }

  /**
    * 根据指定的类型,发送到Kafka
    *
    * @param processedDataRDD
    * @param topicType 0: 查询,1: 预定
    */
  def sendToKafka(processedDataRDD: RDD[ProcessedData], topicType: Int) = {
    //将processedData数据发送到Kafka中
    val messageRDD: RDD[String] = processedDataRDD
      //根据类型进行过滤
      .filter(processedData => processedData.requestType.behaviorType.id == topicType)
      //将数据转换为字符串
      .map(processedData => processedData.toKafkaString())

    //如果经过过滤操作后,还有数据,那么就发送
    if (!messageRDD.isEmpty()) {
      //定义Kafka相关配置
      //查询数据的 topic:target.query.topic = processedQuery
      var topicKey = ""
      if (topicType == 0) {
        topicKey = "target.query.topic"
      } else if (topicType == 1) {
        topicKey = "target.book.topic"
      }
      val queryTopic = PropertiesUtil.getStringByKey(topicKey, "kafkaConfig.properties")
      //创建 map 封装 kafka 参数
      val props = new java.util.HashMap[String, Object]()
      //设置 brokers
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties"))
      //key 序列化方法
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getStringByKey("default.key_serializer_class_config", "kafkaConfig.properties"))
      //value 序列化方法
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getStringByKey("default.value_serializer_class_config", "kafkaConfig.properties"))
      //批发送设置:32KB 作为一批次或 10ms 作为一批次
      props.put(ProducerConfig.BATCH_SIZE_CONFIG, PropertiesUtil.getStringByKey("default.batch_size_config", "kafkaConfig.properties"))
      props.put(ProducerConfig.LINGER_MS_CONFIG, PropertiesUtil.getStringByKey("default.linger_ms_config", "kafkaConfig.properties"))

      messageRDD.foreachPartition(iter => {
        //先创建Kafka连接
        val producer = new KafkaProducer[String, String](props)
        //发送数据
        iter.foreach(message => {
          //发送数据
          producer.send(new ProducerRecord[String, String](queryTopic, message))
        })
        //关闭Kafka连接
        producer.close()
      })
    }
  }
}

主程序:

//将结构化的数据ProcessedData根据不同的请求发送到不同的Topic中
//发送查询数据到Kafka
SendData.sendQueryDataKafka(processedDataRDD)
//发送预定数据到Kafka
SendData.sendBookDataKafka(processedDataRDD)

任务实时监控

Spark自带有性能监控功能,需要在创建SparkConf的时候开启:

 //当应用被停止的时候,进行如下设置可以保证当前批次执行完之后再停止应用。
    System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true")
    //创建Spark配置对象
    val sparkConf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("DataProcessApp")
      //开启Spark性能监控功能
      .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")

在浏览器上可以通过:http://localhost:4040/metrics/json/访问

代码编写

package com.air.antispider.stream.dataprocess.businessprocess

import com.air.antispider.stream.common.bean.ProcessedData
import com.air.antispider.stream.common.util.jedis.{JedisConnectionUtil, PropertiesUtil}
import com.air.antispider.stream.common.util.spark.SparkMetricsUtils
import com.alibaba.fastjson.JSONObject
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.json4s.DefaultFormats
import org.json4s.jackson.Json
import redis.clients.jedis.JedisCluster

object SparkStreamingMonitor {
  /**
    * Spark性能监控,
    *
    * @param sc
    * @param processedDataRDD
    * @param serversCountMap
    */
  def streamMonitor(sc: SparkContext, processedDataRDD: RDD[ProcessedData], serversCountMap: collection.Map[String, Int]) = {

    //1. 获取到Spark的状态信息
    /*
    //在项目上线后,使用下方的方式获取URL
        //监控数据获取
        val sparkDriverHost =
          sc.getConf.get("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY _URI_BASES")
        //在 yarn 上运行的监控数据 json 路径
        val url = s"${sparkDriverHost}/metrics/json"
        */
    val url = "http://localhost:4040/metrics/json/"
    val sparkDataInfo: JSONObject = SparkMetricsUtils.getMetricsJson(url)
    val gaugesObj: JSONObject = sparkDataInfo.getJSONObject("gauges")

    //获取应用ID和应用名称,用来构建json中的key
    val id: String = sc.applicationId
    val appName: String = sc.appName
    //local-1561617727065.driver.DataProcessApp.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime
    val startKey = id + ".driver." + appName + ".StreamingMetrics.streaming.lastCompletedBatch_processingStartTime"
    val endKey = id + ".driver." + appName + ".StreamingMetrics.streaming.lastCompletedBatch_processingEndTime"

    val startTime = gaugesObj.getJSONObject(startKey) //{"value": 1561617812011}
      .getLong("value")
    val endTime = gaugesObj.getJSONObject(endKey) //{"value": 1561617812011}
      .getLong("value")
    //将结束时间进行格式化yyyy-MM-dd HH:mm:ss,注意,web平台使用的是24小时制,所以此处需要使用HH
    val endTimeStr: String = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss").format(endTime)

    //2. 计算时间差
    val costTime = endTime - startTime

    //3. 根据时间差计算数据处理速度,速度= 数量/时间
    //获取处理的数量
    val count: Long = processedDataRDD.count()
    //计算处理速度
    var countPer = 0.0
    if (costTime != 0) {
      countPer = count / costTime
    }


    //4. 交给JavaWeb进行结果展示
    //对serversCountMap进行转换,转换为JSON
    val serversCountMapJson: String = Json(DefaultFormats).write(serversCountMap)

    //根据web平台的代码,发现需要存入Redis中
    val message = Map[String, Any](
      "costTime" -> costTime.toString, //时间差
      "applicationId" -> id, //应用ID
      "applicationUniqueName" -> appName, //应用名称
      "countPerMillis" -> countPer.toString,//计算速度
      "endTime" -> endTimeStr, //结束时间:2019-06-27 15:44:32
      "sourceCount" -> count.toString, //数据的数量
      "serversCountMap" -> serversCountMap //数据采集信息
    )

   //将message转换为json
   val messageJson: String = Json(DefaultFormats).write(message)

    //将messageJson发送到Kafka

    val jedis: JedisCluster = JedisConnectionUtil.getJedisCluster
    //存入Redis的Key.CSANTI_MONITOR_DP + 时间戳
    val key = PropertiesUtil.getStringByKey("cluster.key.monitor.dataProcess", "jedisConfig.properties") + System.currentTimeMillis()
    val ex = PropertiesUtil.getStringByKey("cluster.exptime.monitor", "jedisConfig.properties").toInt
    jedis.setex(key, ex, messageJson)


    //如果需要最后一批数据,那么可以使用下面的方式,
    val lastKey = PropertiesUtil.getStringByKey("cluster.key.monitor.dataProcess", "jedisConfig.properties") + "_LAST"
    jedis.set(lastKey, messageJson)

  }
}

主程序代码:

因为第三个参数serversCountMap涉及到了之前的链路统计,所以需要修改链路统计的返回值

//开启Spark性能监控
//SparkContext, 数据集RDD, 数据采集结果信息
SparkStreamingMonitor.streamMonitor(sc, processedDataRDD, serversCountMap)

实时计算

自定义维护Offset

读取偏移量代码:

package com.air.antispider.stream.rulecompute

import com.air.antispider.stream.common.util.jedis.PropertiesUtil
import com.air.antispider.stream.common.util.kafka.KafkaOffsetUtil
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 黑名单实时计算主程序
  */
object RuleComputeApp {


  def main(args: Array[String]): Unit = {
    //创建Spark执行环境
    //当应用被停止的时候,进行如下设置可以保证当前批次执行完之后再停止应用。
    System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true")
    //创建Spark配置对象
    val sparkConf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("RuleComputeApp")
      //开启Spark性能监控功能
      .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
    //创建SparkContext
    val sc = new SparkContext(sparkConf)
    //创建SparkStreamingContext对象
    val ssc = new StreamingContext(sc, Seconds(2))
  
    val inputStream: InputDStream[(String, String)] = createKafkaStream(ssc)

    inputStream.print()

    //启动程序
    ssc.start()
    ssc.awaitTermination()
  }


  /**
    * 消费Kafka数据,创建InputStream对象
    * @param ssc
    * @return
    */
  def createKafkaStream(ssc: StreamingContext): InputDStream[(String, String)] = {
    //连接Kafka
    //封装Kafka参数信息
    var kafkaParams = Map[String, String]()
    //从kafkaConfig.properties配置文件中获取broker列表信息
    val brokerList: String = PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties")
    kafkaParams += ("metadata.broker.list" -> brokerList)

    //zookeeper主机地址
    val zkHosts: String = PropertiesUtil.getStringByKey("zkHosts", "zookeeperConfig.properties")
    //topic信息存储位置
    val zkPath: String = PropertiesUtil.getStringByKey("rulecompute.antispider.zkPath", "zookeeperConfig.properties")
    //topic
    val topic: String = PropertiesUtil.getStringByKey("source.query.topic", "kafkaConfig.properties")
    //封装topic的集合
    val topics = Set[String](topic)
    //创建zk客户端对象
    val zkClient = new ZkClient(zkHosts)


    //使用KafkaOffsetUtil来获取TopicAndPartition数据
    val topicAndPartitionOption: Option[Map[TopicAndPartition, Long]] = KafkaOffsetUtil.readOffsets(zkClient, zkHosts, zkPath, topic)

    val inputStream: InputDStream[(String, String)] = topicAndPartitionOption match {
      //如果有数据:从Zookeeper中读取偏移量
      case Some(topicAndPartition) =>
        val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, topicAndPartition, messageHandler)
      //如果没有数据,还按照以前的方式来读取数据
      case None => KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    }
    inputStream
  }
}

保存偏移量代码:

    //将数据偏移量到zookeeper中
    inputStream.foreachRDD(rdd => {
      //保存偏移量
      saveOffsets(rdd)
    })

/**
    * 保存偏移量信息
    * @param rdd
    */
  def saveOffsets(rdd: RDD[(String, String)]): Unit = {
    //zookeeper主机地址
    val zkHosts: String = PropertiesUtil.getStringByKey("zkHosts", "zookeeperConfig.properties")
    //创建zk客户端对象
    val zkClient = new ZkClient(zkHosts)
    //topic信息存储位置
    val zkPath: String = PropertiesUtil.getStringByKey("rulecompute.antispider.zkPath", "zookeeperConfig.properties")

    KafkaOffsetUtil.saveOffsets(zkClient, zkHosts, zkPath, rdd)
  }

数据封装

将获取到的字符串转换为ProcessedData对象,可以直接从讲义中拷贝过来

代码:

package com.air.antispider.stream.rulecompute.businessprocess

import com.air.antispider.stream.common.bean._
import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum
import com.air.antispider.stream.dataprocess.constants.{BehaviorTypeEnum, FlightTypeEnum, TravelTypeEnum}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.streaming.dstream.DStream

/**
  * 数据分割封装
  */
object QueryDataPackage {
  /**
    * 数据分割封装
    *
    * @param lines * @return
    */
  def queryDataLoadAndPackage(lines: DStream[String]): DStream[ProcessedData] = {
    //使用 mapPartitions 减少包装类的创建开销
    lines.mapPartitions { partitionsIterator =>
      //创建 json 解析
      val mapper = new ObjectMapper
      mapper.registerModule(DefaultScalaModule)
      //将数据进行 map,一条条处理
      partitionsIterator.map { sourceLine =>
        //分割数据
        val dataArray = sourceLine.split("#CS#", -1)
        //原始数据,站位,并无数据
        val sourceData = dataArray(0)
        val requestMethod = dataArray(1)
        val request = dataArray(2)
        val remoteAddr = dataArray(3)
        val httpUserAgent = dataArray(4)
        val timeIso8601 = dataArray(5)
        val serverAddr = dataArray(6)
        val highFrqIPGroup: Boolean = dataArray(7).equalsIgnoreCase("true")
        val requestType: RequestType = RequestType(FlightTypeEnum.withName(dataArray(8)), BehaviorTypeEnum.withName(dataArray(9)))
        val travelType: TravelTypeEnum = TravelTypeEnum.withName(dataArray(10))
        val requestParams: CoreRequestParams = CoreRequestParams(dataArray(11), dataArray(12), dataArray(13))
        val cookieValue_JSESSIONID: String = dataArray(14)
        val cookieValue_USERID: String = dataArray(15)
        //分析查询请求的时候不需要 book 数据
        val bookRequestData: Option[BookRequestData] = None
        //封装 query 数据
        val queryRequestData = if (!dataArray(16).equalsIgnoreCase("NULL")) {
          mapper.readValue(dataArray(16), classOf[QueryRequestData]) match {
            case value if value != null => Some(value)
            case _ => None
          }
        } else {
          None
        }
        val httpReferrer = dataArray(18)
        //封装流程数据,返回
        ProcessedData("", requestMethod, request, remoteAddr, httpUserAgent, timeIso8601, serverAddr, highFrqIPGroup, requestType, travelType, requestParams, cookieValue_JSESSIONID, cookieValue_USERID, queryRequestData, bookRequestData, httpReferrer)
      }
    }
  }
}

主程序:

    //从inputStream中取出消息
    val dStream: DStream[String] = inputStream.map(_._2)
    //将消息转换为ProcessedData对象
    val processedDataDStream: DStream[ProcessedData] = QueryDataPackage.queryDataLoadAndPackage(dStream)

加载规则

从MySQL中获取:1. 关键页面 2. 黑名单IP 3. 流程规则

  1. 从数据库中查询到所有的流程
  2. 关联查询到每个流程的策略分值
  3. 查询每个流程对应的规则信息
  4. 查询每个规则的真实名称

    /**
    * 获取流程列表
    * 参数n为0为反爬虫流程
    参数n为1为防占座流程

    * @return ArrayBuffer[FlowCollocation]
    */
    def createFlow(n:Int) :ArrayBuffer[FlowCollocation] = {
    var array = new ArrayBuffer[FlowCollocation]
    var sql:String = ""
    if(n == 0){ sql = "select nh_process_info.id,nh_process_info.process_name,nh_strategy.crawler_blacklist_thresholds from nh_process_info,nh_strategy where nh_process_info.id=nh_strategy.id and status=0"}
    else if(n == 1){sql = "select nh_process_info.id,nh_process_info.process_name,nh_strategy.occ_blacklist_thresholds from nh_process_info,nh_strategy where nh_process_info.id=nh_strategy.id and status=1"}

     var conn: Connection = null
     var ps: PreparedStatement = null
     var rs:ResultSet = null
     try{
       conn = c3p0Util.getConnection
       ps = conn.prepareStatement(sql)
       rs = ps.executeQuery()
       while (rs.next()) {
         val flowId = rs.getString("id")
         val flowName = rs.getString("process_name")
         if(n == 0){
           val flowLimitScore = rs.getDouble("crawler_blacklist_thresholds")
           array += new FlowCollocation(flowId, flowName,createRuleList(flowId,n), flowLimitScore, flowId)
         }else if(n == 1){
           val flowLimitScore = rs.getDouble("occ_blacklist_thresholds")
           array += new FlowCollocation(flowId, flowName,createRuleList(flowId,n), flowLimitScore, flowId)
         }
    
       }
     }catch{
       case e : Exception => e.printStackTrace()
     }finally {
       c3p0Util.close(conn, ps, rs)
     }
     array

    }

    /**
    * 获取规则列表

    @param process_id 根据该ID查询规则
    * @return list列表
    /
    def createRuleList(process_id:String,n:Int):List[RuleCollocation] = {
    var list = new ListBuffer[RuleCollocation]
    val sql = "select
    from(select nh_rule.id,nh_rule.process_id,nh_rules_maintenance_table.rule_real_name,nh_rule.rule_type,nh_rule.crawler_type,"+
    "nh_rule.status,nh_rule.arg0,nh_rule.arg1,nh_rule.score from nh_rule,nh_rules_maintenance_table where nh_rules_maintenance_table."+
    "rule_name=nh_rule.rule_name) as tab where process_id = '"+process_id + "'and crawler_type="+n
    //and status="+n
    var conn: Connection = null
    var ps: PreparedStatement = null
    var rs:ResultSet = null
    try{
    conn = c3p0Util.getConnection
    ps = conn.prepareStatement(sql)
    rs = ps.executeQuery()
    while ( rs.next() ) {
    val ruleId = rs.getString("id")
    val flowId = rs.getString("process_id")
    val ruleName = rs.getString("rule_real_name")
    val ruleType = rs.getString("rule_type")
    val ruleStatus = rs.getInt("status")
    val ruleCrawlerType = rs.getInt("crawler_type")
    val ruleValue0 = rs.getDouble("arg0")
    val ruleValue1 = rs.getDouble("arg1")
    val ruleScore = rs.getInt("score")
    val ruleCollocation = new RuleCollocation(ruleId,flowId,ruleName,ruleType,ruleStatus,ruleCrawlerType,ruleValue0,ruleValue1,ruleScore)
    list += ruleCollocation
    }
    }catch {
    case e : Exception => e.printStackTrace()
    }finally {
    c3p0Util.close(conn, ps, rs)
    }
    list.toList
    }

FlowCollocation``RuleCollocation需要从反扒参考资料\工具包\ruleComputeBean中拷贝到项目中

将流程信息放入广播变量

    //将流程数据加载到广播变量
    val flowCollocations: ArrayBuffer[FlowCollocation] = AnalyzeRuleDB.createFlow()
    @volatile var flowCollocationsBroadcast: Broadcast[ArrayBuffer[FlowCollocation]] = sc.broadcast(flowCollocations)

更新广播变量

  //更新流程的广播变量flowCollocationsBroadcast
      var flowCollocationChangeFlag: String = jedis.get("flowCollocationChangeFlag")
      //先判断classifyRuleChangeFlag是否为空
      if (StringUtils.isBlank(flowCollocationChangeFlag)){
        flowCollocationChangeFlag = "true"
        //重新设置到Redis中
        jedis.set("flowCollocationChangeFlag", flowCollocationChangeFlag)
      }
      if (flowCollocationChangeFlag.toBoolean) {
        flowCollocationsBroadcast.unpersist()
        //将黑名单数据加载到广播变量
        val flowCollocations: ArrayBuffer[FlowCollocation] = AnalyzeRuleDB.createFlow()
        flowCollocationsBroadcast = sc.broadcast(flowCollocations)

        flowCollocationChangeFlag = "false"
        //重新设置到Redis中
        jedis.set("flowCollocationChangeFlag", flowCollocationChangeFlag)
      }

规则计算

IP段指标计算

package com.air.antispider.stream.rulecompute.businessprocess

import com.air.antispider.stream.common.bean.ProcessedData
import org.apache.spark.rdd.RDD

/**
  * 按照不同的维度进行计算的工具类
  */
object CoreRule {
  /**
    * IP段指标计算
    * @param processedDataRDD
    */
  def ipBlockCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = {
    val mapRDD: RDD[(String, Int)] = processedDataRDD.map(processedData => {
      //获取客户端IP 192.168.80.81
      val ip: String = processedData.remoteAddr
      //获取IP的前2位, 192.168
      val arr: Array[String] = ip.split("\\.")
      if (arr.length == 4) {
        //代表这是一个完整的IP
        val ipBlock = arr(0) + "." + arr(1)
        //(ip段, 1)
        (ipBlock, 1)
      } else {
        ("", 1)
      }
    })
    //按照IP段进行分组,聚合计算
    val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey((x, y) => x + y)
    //将结果采集为Map类型返回
    resultRDD.collectAsMap()
  }
}

主程序:

  //开始根据各个指标维度进行计算
  //计算IP段的访问量
val ipBlockCountMap: collection.Map[String, Int] = CoreRule.ipBlockCount(processedDataRDD)

IP访问量

代码:

/**
    * 计算IP5分钟访问量
    * @param processedDataRDD
    * @return
    */
  def ipCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = {
    processedDataRDD.map(processedData => {
      val ip: String = processedData.remoteAddr
      //(ip, 次数)
      (ip, 1)
    })
      //累加
      .reduceByKey(_ + _)
      //采集数据
      .collectAsMap()
  }

IP对关键页面的访问量

/**
    * 计算IP访问关键页面的次数
    * @param processedDataRDD
    * @param criticalPagesList
    * @return
    */
  def ipCriticalPagesCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Int] = {
    processedDataRDD.map(processedData => {
      val ip: String = processedData.remoteAddr
      val url: String = processedData.request
      //定义访问次数,默认为0次
      var count = 0
      for (criticalPages <- criticalPagesList) {
        if (url.matches(criticalPages)){
          //如果匹配上,代表访问了1次关键页面
          count = 1
        }
      }
      (ip, count)
    })
      //累加
      .reduceByKey(_ + _)
      //采集数据
      .collectAsMap()
  }

IP携带不同UA的个数

/**
    * 计算IP5分钟携带不同UA的个数
    * @param processedDataRDD
    * @return
    */
  def ipUACount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = {
    //将processedData转换为(ip, ua)的格式
    val mapData: RDD[(String, String)] = processedDataRDD.map(processedData => {
      val ip: String = processedData.remoteAddr
      val ua: String = processedData.httpUserAgent
      (ip, ua)
    })
    //(ip, ua) => (ip, (ua1, ua2, ua1))的格式
    val groupRDD: RDD[(String, Iterable[String])] = mapData.groupByKey()
    //将(ip, (ua1, ua2, ua1))的格式 转换为 (ip, 次数)的格式
    groupRDD.map(line => {
      val ip: String = line._1
      val sourceData: Iterable[String] = line._2
      //创建一个Set集合,将原始的数据放入集合中,去重
      var set = Set[String]()
      for (ua <- sourceData) {
        //将ua放入set集合
        set += ua
      }
      (ip, set.size)
    })
      .collectAsMap()

  }

IP访问关键页面最小时间间隔

/**
    * 计算IP5分钟访问关键页面最小时间间隔
    *
    * @param processedDataRDD
    * @param criticalPagesList
    * @return
    */
  def ipCriticalPagesMinTimeCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Long] = {
    //先过滤出关键页面
    processedDataRDD
      //过滤
      .filter(processedData => {
      val url: String = processedData.request
      //定义访问次数,默认为0次
      var count = 0
      for (criticalPages <- criticalPagesList) {
        if (url.matches(criticalPages)) {
          //如果匹配上,代表访问了1次关键页面
          count = 1
        }
      }
      //如果count == 1,代表当前访问的是关键页面,返回true
      if (count == 0) {
        false
      } else {
        true
      }
    })
      //转换,获取(ip,时间戳)
      .map(processedData => {
        val ip: String = processedData.remoteAddr
        val time: String = processedData.timeIso8601
        //time的格式2019-06-29T08:46:56+08:00
        val timeStamp: Long = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss").parse(time).getTime
        (ip, timeStamp)
      })
      //分组(ip,(时间1,时间2,时间3...))
      .groupByKey()
      //转换,为了获取(IP,最小时间差)
      .map(line => {
        val ip: String = line._1
        //封装所有时间的迭代器对象
        val sourceData: Iterable[Long] = line._2
        //将迭代器对象转换为Array
        val sourceArray: Array[Long] = sourceData.toArray
        //将原始数据进行排序
        util.Arrays.sort(sourceArray)
        //定义一个用于存储差值的集合
        var resultArray = new ArrayBuffer[Long]()
        for (i <- 0 until sourceArray.size - 1) {
          //当前元素
          val currentTime: Long = sourceArray(i)
          //下一个元素
          val nexTime: Long = sourceArray(i + 1)
          val result = nexTime - currentTime
          //将差值存入集合
          resultArray += result
        }
        //将差值结果进行排序
        val array: Array[Long] = resultArray.toArray
        util.Arrays.sort(array)
        (ip, array(0))
      })
      //采集数据
      .collectAsMap()
  }

IP访问关键页面时间间隔小于预设时间的次数

代码:

/**
    * 计算IP5分钟访问关键页面最小时间间隔小于预设值的次数
    * @param processedDataRDD
    * @param criticalPagesList
    * @return
    */
  def ipCriticalPagesMinNumCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[(String, String), Int] = {
    //先过滤出关键页面
    processedDataRDD
      //过滤
      .filter(processedData => {
      val url: String = processedData.request
      //定义访问次数,默认为0次
      var count = 0
      for (criticalPages <- criticalPagesList) {
        if (url.matches(criticalPages)) {
          //如果匹配上,代表访问了1次关键页面
          count = 1
        }
      }
      //如果count == 1,代表当前访问的是关键页面,返回true
      if (count == 0) {
        false
      } else {
        true
      }
    })
      //转换,获取((IP, URL),时间戳)
      .map(processedData => {
      val ip: String = processedData.remoteAddr
      val url: String = processedData.request
      val time: String = processedData.timeIso8601
      //time的格式2019-06-29T08:46:56+08:00
      val timeStamp: Long = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss").parse(time).getTime
      ((ip, url), timeStamp)
    })
      //分组((IP, URL),(时间1,时间2,时间3...))
      .groupByKey()
      //转换,为了获取(IP,最小时间差)
      .map(line => {
      val key: (String, String) = line._1
      //封装所有时间的迭代器对象
      val sourceData: Iterable[Long] = line._2
      //将迭代器对象转换为Array
      val sourceArray: Array[Long] = sourceData.toArray
      //将原始数据进行排序
      util.Arrays.sort(sourceArray)
      //定义一个用于存储差值的集合
      var resultArray = new ArrayBuffer[Long]()
      for (i <- 0 until sourceArray.size - 1) {
        //当前元素
        val currentTime: Long = sourceArray(i)
        //下一个元素
        val nexTime: Long = sourceArray(i + 1)
        val result = nexTime - currentTime
        //将小于预设值的差值存入集合(此处直接写死5秒钟)
        if (result < 5000) {
          resultArray += result
        }
      }
      //返回((ip, url), 次数)
      (key, resultArray.size)
    })
      .collectAsMap()
  }

计算IP5分钟查询不同航班的次数

/**
    * 计算IP5分钟查询不同航班的次数
    * @param processedDataRDD
    * @return
    */
  def ipCityCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = {
    //(ip , 出发地->目的地)
    processedDataRDD.map(line => {
      val ip: String = line.remoteAddr
      //出发地
      val depcity: String = line.requestParams.depcity
      //目的地
      val arrcity: String = line.requestParams.arrcity
      (ip, depcity + "->" + arrcity)
    })
      .groupByKey()
      //(ip, 不同城市的次数)
      .map(line => {
        val ip: String = line._1
        val sourceCitys: Iterable[String] = line._2
        //定义Set集合实现去重
        var set = Set[String]()
        //循环,去重
        for (city <- sourceCitys) {
          set += city
        }
        (ip, set.size)
      })
      .collectAsMap()
  }

IP5分钟携带不同Cookie的数量

/**
  * 计算IP5分钟携带不同Cookie的数量
  * @param processedDataRDD
  * @param criticalPagesList
  * @return
  */
def ipCookieCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Int] = {
  //先过滤出关键页面
  processedDataRDD
    //过滤
    .filter(processedData => {
    val url: String = processedData.request
    //定义访问次数,默认为0次
    var count = 0
    for (criticalPages <- criticalPagesList) {
      if (url.matches(criticalPages)) {
        //如果匹配上,代表访问了1次关键页面
        count = 1
      }
    }
    //如果count == 1,代表当前访问的是关键页面,返回true
    if (count == 0) {
      false
    } else {
      true
    }
  })
    //(ip , jSessionID)
    .map(line => {
      val ip: String = line.remoteAddr
      //SessionID
      val sessionID: String = line.cookieValue_JSESSIONID
      (ip, sessionID)
    })
      .groupByKey()
      //(ip, (sID1, sID2, sID1))
      .map(line => {
      val ip: String = line._1
      val sourceSessionIDs: Iterable[String] = line._2
      //定义Set集合实现去重
      var set = Set[String]()
      //循环,去重
      for (sessionID <- sourceSessionIDs) {
        set += sessionID
      }
      (ip, set.size)
    })
      .collectAsMap()
}

黑名单打分计算

从数据库中加载到流程的相关信息,里面包含每个流程自己的规则列表,我们已经计算好了每个规则的数量,只需要和数据库的规则进行比对就可以得出超出范围指标打分的列表,以及开启规则的打分列表

代码:

package com.air.antispider.stream.rulecompute.businessprocess

import com.air.antispider.stream.common.bean.{FlowCollocation, ProcessedData, RuleCollocation}
import com.air.antispider.stream.rulecompute.bean.{AntiCalculateResult, FlowScoreResult}
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer

object RuleUtil {



  /**
    * 通过各个规则计算流程最终结果
    *
    * @param processedDataRDD
    * @param ipBlockCountMap
    * @param ipCountMap
    * @param ipCriticalPagesMap
    * @param ipUAMap
    * @param ipCriticalPagesMinTimeMap
    * @param ipCriticalPagesMinNumMap
    * @param ipCityCountMap
    * @param ipCookieCountMap
    * @param flowCollocationList
    */
  def calculateAntiResult(
                           processedDataRDD: RDD[ProcessedData],
                           ipBlockCountMap: collection.Map[String, Int],
                           ipCountMap: collection.Map[String, Int],
                           ipCriticalPagesMap: collection.Map[String, Int],
                           ipUAMap: collection.Map[String, Int],
                           ipCriticalPagesMinTimeMap: collection.Map[String, Long],
                           ipCriticalPagesMinNumMap: collection.Map[(String, String), Int],
                           ipCityCountMap: collection.Map[String, Int],
                           ipCookieCountMap: collection.Map[String, Int],
                           flowCollocationList: ArrayBuffer[FlowCollocation]
                         ): RDD[AntiCalculateResult] = {

    //从map中获取各个指标的数据
    processedDataRDD.map(processedData => {
      val ip: String = processedData.remoteAddr
      val url: String = processedData.request

      //获取IP的前2位, 192.168
      val arr: Array[String] = ip.split("\\.")
      var ipBlock = ""
      if (arr.length == 4) {
        //代表这是一个完整的IP
        ipBlock = arr(0) + "." + arr(1)
      }
      //获取IP段的值
      val ipBlockCounts: Int = ipBlockCountMap.getOrElse(ipBlock, 0)
      //获取IP的值
      val ipCounts: Int = ipCountMap.getOrElse(ip, 0)
      //获取关键页面的值
      val ipCriticalPagesCounts: Int = ipCriticalPagesMap.getOrElse(ip, 0)
      val ipUACounts: Int = ipUAMap.getOrElse(ip, 0)
      //最小访问时间间隔,如果获取不到IP,给个Int最大值,不能给0
      val ipCriticalPagesMinTimeCounts: Int = ipCriticalPagesMinTimeMap.getOrElse(ip, Integer.MAX_VALUE).toInt
      val ipCriticalPagesMinNumCounts: Int = ipCriticalPagesMinNumMap.getOrElse((ip, url), 0)
      val ipCityCounts: Int = ipCityCountMap.getOrElse(ip, 0)
      val ipCookieCounts: Int = ipCookieCountMap.getOrElse(ip, 0)

      //定义map封装规则分值信息
      val map = Map[String, Int](
        "ipBlock" -> ipBlockCounts,
        "ip" -> ipCounts,
        "criticalPages" -> ipCriticalPagesCounts,
        "userAgent" -> ipUACounts,
        "criticalPagesAccTime" -> ipCriticalPagesMinTimeCounts,
        "criticalPagesLessThanDefault" -> ipCriticalPagesMinNumCounts,
        "flightQuery" -> ipCityCounts,
        "criticalCookies" -> ipCookieCounts
      )


      val flowsScore: Array[FlowScoreResult] = computeFlowScore(map, flowCollocationList)



      AntiCalculateResult(
        processedData,
        ip,
        ipBlockCounts,
        ipCounts,
        ipCriticalPagesCounts,
        ipUACounts,
        ipCriticalPagesMinTimeCounts,
        ipCriticalPagesMinNumCounts,
        ipCityCounts,
        ipCookieCounts,
        null
      )
    })
  }

  /**
    * 开始计算,获取最终计算结果
    * @param map
    * @param flowCollocationList
    * @return
    */
  def computeFlowScore(map: Map[String, Int], flowCollocationList: ArrayBuffer[FlowCollocation]): Array[FlowScoreResult] = {
    //因为传过来的flowCollocationList代表多个流程,所以先循环流程
    for (flow <- flowCollocationList) {
      //通过flow,获取该流程下的规则
      val rules: List[RuleCollocation] = flow.rules

      //定义集合存储超出范围的规则得分信息
      var array1 = new ArrayBuffer[Int]()
      //定义超出范围,并且处于开启状态的得分信息
      var array2 = new ArrayBuffer[Int]()

      for (rule <- rules) {
        val ruleName: String = rule.ruleName
        val num: Int = map.getOrElse(ruleName, 0)
        //如果数据库名称和计算结果名称一样,开始比较大小
        if (num > rule.ruleValue0) {
          //如果当前计算结果超出了数据库配置好的阈值范围,那么就命中该规则
          //将得分放入集合
          array1 += rule.ruleScore
          if (rule.ruleStatus == 0){
            //如果当前规则状态为开启状态
            array2 += rule.ruleScore
          }
        }
      }

//      val result = xxx(array1, array2)


    }
    null
  }

}

标签:String,val,rs,var,反扒,print,import
来源: https://www.cnblogs.com/jeasonchen001/p/11192079.html