Flink_恶意登录监控 (利用CEP)
作者:互联网
对于网站而言,
用户登录并不是频繁的业务操作
。如果
一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解
。因此我们考虑, 应该对用户的登录失败动作进行统计,具体来说,如果
同一用户(可以是不同 IP) 在 2 秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示
。这是电商网站、也是几乎所有网站风控的基本一环。
会用到 flink 的 CEP 库来实现事件流 ,方法三:利用CEP是重点方法。
pom 文件中引入
CEP
的相关依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
输入数据:
方法一:
状态编程:
由于引入了时间,最简单的方法其实与之前的热门统计类 似,只需要 按照用户 ID 分流,然后遇到登录失败的事件时将其保存在 ListState 中, 然后设置一个定时器,2 秒后触发。定时器触发时检查状态中的登录失败事件个数,如果大于等于 2,那么就输出报警信息。 (这种做法只能隔 2 秒之后去判断一下这期间是否有多次失败登录,而不是在一 次登录失败之后、再一次登录失败就立刻报警)import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
//输入登录事件
case class LoginEvent(userId: Long, ip: String, eventType: String, timeStamp: Long)
// 输出报警信息样例类
case class LoginFailWarning(userId: Long, firstFailTime: Long, lastFailTime: Long, warnMsg: String)
object LoginFail {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream: DataStream[String] = env.readTextFile("D:\\Mywork\\workspace\\Project_idea\\UserBehaviorAnalysis0903\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv")
//转换成样例类,并提取时间戳和watermark
val loginEventStream = inputStream.map{data =>
val arr = data.split(",")
LoginEvent(arr(0) toLong, arr(1), arr(2), arr(3).toLong)
}.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) {
override def extractTimestamp(t: LoginEvent): Long = t.timeStamp * 1000L
})
// 进行判断和检测,如果2秒之内连续登录失败,输出报警信息
val loginFailWarningStream = loginEventStream
.keyBy(_.userId)
.process(new LoginFailWarningResult(2))
loginFailWarningStream.print()
env.execute("login fail detect job")
}
}
class LoginFailWarningResult(failTimes: Int) extends KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]{
// 定义状态,保存当前所有的登录失败事件,保存定时器的时间戳
lazy val loginFailListState: ListState[LoginEvent] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEvent]("loginfail-list", classOf[LoginEvent]))
lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts", classOf[Long]))
override def processElement(value: LoginEvent, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {
// 判断当前登录事件是成功还是失败
if( value.eventType == "fail" ){
loginFailListState.add(value)
// 如果没有定时器,那么注册一个2秒后的定时器
if( timerTsState.value() == 0 ){
val ts = value.timeStamp * 1000L + 2000L
ctx.timerService().registerEventTimeTimer(ts)
timerTsState.update(ts)
}
} else {
// 如果是成功,那么直接清空状态和定时器,重新开始
ctx.timerService().deleteEventTimeTimer(timerTsState.value())
loginFailListState.clear()
timerTsState.clear()
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#OnTimerContext, out: Collector[LoginFailWarning]): Unit = {
val allLoginFailList: ListBuffer[LoginEvent] = new ListBuffer[LoginEvent]()
val iter = loginFailListState.get().iterator()
while(iter.hasNext){
allLoginFailList += iter.next()
}
// 判断登录失败事件的个数,如果超过了上限,报警
if(allLoginFailList.length >= failTimes){
out.collect(
LoginFailWarning(
allLoginFailList.head.userId,
allLoginFailList.head.timeStamp,
allLoginFailList.last.timeStamp,
"login fail in 2s for " + allLoginFailList.length + " times."
))
}
// 清空状态
loginFailListState.clear()
timerTsState.clear()
}
}
//输出结果
LoginFailWarning(1035,1558430842,1558430844,login fail in 2s for 3 times.)
方法二:
可以不用定时器触发,直接在状态中存取上 一次登录失败的事件,每次都做判断和比对。import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object LoginFailAdvance {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream = env.readTextFile("D:\\Mywork\\workspace\\Project_idea\\UserBehaviorAnalysis0903\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv")
val loginEventStream = inputStream.map { data =>
val arr = data.split(",")
LoginEvent(arr(0) toLong, arr(1), arr(2), arr(3).toLong)
}.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
override def extractTimestamp(t: LoginEvent): Long = {
t.timeStamp * 1000L
}
})
// 进行判断和检测,如果2秒之内连续登录失败,输出报警信息
val loginFailWarningStream = loginEventStream
.keyBy(_.userId)
.process(new LoginFailWaringAdvanceResult())
loginFailWarningStream.print()
env.execute("login fail detect job")
}
}
class LoginFailWaringAdvanceResult() extends KeyedProcessFunction[Long, LoginEvent, LoginFailWarning] {
// 定义状态,保存当前所有的登录失败事件
lazy val loginFailListState: ListState[LoginEvent] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEvent]("loginfail-list", classOf[LoginEvent]))
override def processElement(value: LoginEvent, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {
// 首先判断事件类型
if (value.eventType == "fail") {
val iter = loginFailListState.get().iterator()
if (iter.hasNext) { //状态迭代器非空(也就是同一次Id第二次进入)
val firstFailEvent: LoginEvent = iter.next()
if (value.timeStamp < firstFailEvent.timeStamp + 2) { //如果第二次fail登录时间在第一次fail登录时间的2秒内
out.collect(LoginFailWarning(value.userId, firstFailEvent.timeStamp, value.timeStamp, "login fail 2 times in 2s"))
}
loginFailListState.clear()
loginFailListState.add(value)
} else {
loginFailListState.add(value) //第一次错误登录记录
}
} else {
loginFailListState.clear()
}
}
}
//输出结果
LoginFailWarning(1035,1558430842,1558430843,login fail 2 times in 2s)
LoginFailWarning(1035,1558430843,1558430844,login fail 2 times in 2s)
* 方法三: CEP(复杂时间处理)( 重点 )
flink 为我们提供了 CEP ( Complex Event Processing ,复杂事件处理) 库,用于在流中筛选符合某种复杂模式的事件。import java.util
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
//cep复杂问题处理 可以解决乱序问题
object LoginFailWithCep {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream: DataStream[String] = env.readTextFile("D:\\Mywork\\workspace\\Project_idea\\UserBehaviorAnalysis0903\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv")
val loginEventBykeyStream: KeyedStream[LoginEvent, Long] = inputStream.map { data =>
val arr = data.split(",")
LoginEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
}.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
override def extractTimestamp(element: LoginEvent): Long = element.timeStamp * 1000L
}).keyBy(_.userId)
// 1. 定义匹配的模式,要求是一个登录失败事件后,紧跟另一个登录失败事件
val loginFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern
.begin[LoginEvent]("firstFail").where(_.eventType == "fail")
.next("secondFail").where(_.eventType == "fail")
.within(Time.seconds(2))
val patternStream = CEP.pattern(loginEventBykeyStream, loginFailPattern)
val loginFailWarningStream = patternStream.select(new LoginFailEventMatch())
loginFailWarningStream.print()
env.execute("login fail with cep job")
}
}
class LoginFailEventMatch() extends PatternSelectFunction[LoginEvent, LoginFailWarning] {
//map(key, value) key: firstFail, secondFail value:LoginEvent
override def select(map: util.Map[String, util.List[LoginEvent]]): LoginFailWarning = {
val firstFailEvent = map.get("firstFail").iterator().next()
val secondFailEvent = map.get("secondFail").iterator().next()
LoginFailWarning(firstFailEvent.userId, firstFailEvent.timeStamp, secondFailEvent.timeStamp, "login fail")
}
}
//输出结果
LoginFailWarning(1035,1558430841,1558430842,login fail)
LoginFailWarning(1035,1558430843,1558430844,login fail)
标签:flink,val,LoginEvent,org,Flink,恶意,CEP,apache,import 来源: https://blog.csdn.net/qq_36213530/article/details/120111770