其他分享
首页 > 其他分享> > Flink_恶意登录监控 (利用CEP)

Flink_恶意登录监控 (利用CEP)

作者:互联网

        对于网站而言, 用户登录并不是频繁的业务操作 。如果 一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解 。因此我们考虑, 应该对用户的登录失败动作进行统计,具体来说,如果 同一用户(可以是不同 IP) 在 2 秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示 。这是电商网站、也是几乎所有网站风控的基本一环。 会用到 flink 的 CEP 库来实现事件流 ,方法三:利用CEP是重点方法。    pom 文件中引入 CEP 的相关依赖:
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-cep-scala_${scala.binary.version}</artifactId> 
    <version>${flink.version}</version> 
</dependency>

输入数据: 

方法一:

状态编程:

        由于引入了时间,最简单的方法其实与之前的热门统计类 似,只需要 按照用户 ID 分流,然后遇到登录失败的事件时将其保存在 ListState 中, 然后设置一个定时器,2 秒后触发。定时器触发时检查状态中的登录失败事件个数,如果大于等于 2,那么就输出报警信息。 (这种做法只能隔 2 秒之后去判断一下这期间是否有多次失败登录,而不是在一 次登录失败之后、再一次登录失败就立刻报警)
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

//输入登录事件
case class LoginEvent(userId: Long, ip: String, eventType: String, timeStamp: Long)

// 输出报警信息样例类
case class LoginFailWarning(userId: Long, firstFailTime: Long, lastFailTime: Long, warnMsg: String)

object LoginFail {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream: DataStream[String] = env.readTextFile("D:\\Mywork\\workspace\\Project_idea\\UserBehaviorAnalysis0903\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv")
    //转换成样例类,并提取时间戳和watermark
    val loginEventStream = inputStream.map{data =>
        val arr = data.split(",")
        LoginEvent(arr(0) toLong, arr(1), arr(2), arr(3).toLong)
    }.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) {
      override def extractTimestamp(t: LoginEvent): Long = t.timeStamp * 1000L
    })

    // 进行判断和检测,如果2秒之内连续登录失败,输出报警信息
    val loginFailWarningStream = loginEventStream
      .keyBy(_.userId)
      .process(new LoginFailWarningResult(2))

    loginFailWarningStream.print()
    env.execute("login fail detect job")
  }
}

class LoginFailWarningResult(failTimes: Int) extends KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]{
  // 定义状态,保存当前所有的登录失败事件,保存定时器的时间戳
  lazy val loginFailListState: ListState[LoginEvent] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEvent]("loginfail-list", classOf[LoginEvent]))
  lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts", classOf[Long]))

  override def processElement(value: LoginEvent, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {
    // 判断当前登录事件是成功还是失败
    if( value.eventType == "fail" ){
      loginFailListState.add(value)
      // 如果没有定时器,那么注册一个2秒后的定时器
      if( timerTsState.value() == 0 ){
        val ts = value.timeStamp * 1000L + 2000L
        ctx.timerService().registerEventTimeTimer(ts)
        timerTsState.update(ts)
      }
    } else {
      // 如果是成功,那么直接清空状态和定时器,重新开始
      ctx.timerService().deleteEventTimeTimer(timerTsState.value())
      loginFailListState.clear()
      timerTsState.clear()
    }
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#OnTimerContext, out: Collector[LoginFailWarning]): Unit = {
    val allLoginFailList: ListBuffer[LoginEvent] = new ListBuffer[LoginEvent]()
    val iter = loginFailListState.get().iterator()
    while(iter.hasNext){
      allLoginFailList += iter.next()
    }
    // 判断登录失败事件的个数,如果超过了上限,报警
    if(allLoginFailList.length >= failTimes){
      out.collect(
        LoginFailWarning(
          allLoginFailList.head.userId,
          allLoginFailList.head.timeStamp,
          allLoginFailList.last.timeStamp,
          "login fail in 2s for " + allLoginFailList.length + " times."
        ))
    }
    // 清空状态
    loginFailListState.clear()
    timerTsState.clear()
  }
}
//输出结果
LoginFailWarning(1035,1558430842,1558430844,login fail in 2s for 3 times.)

方法二:

可以不用定时器触发,直接在状态中存取上 一次登录失败的事件,每次都做判断和比对。
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object LoginFailAdvance {
  def main(args: Array[String]): Unit = {

      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setParallelism(1)
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

      val inputStream = env.readTextFile("D:\\Mywork\\workspace\\Project_idea\\UserBehaviorAnalysis0903\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv")
      val loginEventStream = inputStream.map { data =>
        val arr = data.split(",")
        LoginEvent(arr(0) toLong, arr(1), arr(2), arr(3).toLong)
      }.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
        override def extractTimestamp(t: LoginEvent): Long = {
          t.timeStamp * 1000L
        }
      })
      // 进行判断和检测,如果2秒之内连续登录失败,输出报警信息
      val loginFailWarningStream = loginEventStream
        .keyBy(_.userId)
        .process(new LoginFailWaringAdvanceResult())

      loginFailWarningStream.print()
      env.execute("login fail detect job")
    }
}

class LoginFailWaringAdvanceResult() extends KeyedProcessFunction[Long, LoginEvent, LoginFailWarning] {
  // 定义状态,保存当前所有的登录失败事件
  lazy val loginFailListState: ListState[LoginEvent] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEvent]("loginfail-list", classOf[LoginEvent]))

  override def processElement(value: LoginEvent, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {
    // 首先判断事件类型
    if (value.eventType == "fail") {
      val iter = loginFailListState.get().iterator()
      if (iter.hasNext) { //状态迭代器非空(也就是同一次Id第二次进入)
        val firstFailEvent: LoginEvent = iter.next()
        if (value.timeStamp < firstFailEvent.timeStamp + 2) { //如果第二次fail登录时间在第一次fail登录时间的2秒内
          out.collect(LoginFailWarning(value.userId, firstFailEvent.timeStamp, value.timeStamp, "login fail 2 times in 2s"))
        }
        loginFailListState.clear()
        loginFailListState.add(value)
      } else {
        loginFailListState.add(value) //第一次错误登录记录
      }
    } else {
      loginFailListState.clear()
    }
  }
}
//输出结果
LoginFailWarning(1035,1558430842,1558430843,login fail 2 times in 2s)
LoginFailWarning(1035,1558430843,1558430844,login fail 2 times in 2s)

* 方法三: CEP(复杂时间处理)( 重点 )

flink 为我们提供了 CEP ( Complex Event Processing ,复杂事件处理) 库,用于在流中筛选符合某种复杂模式的事件。
import java.util

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
//cep复杂问题处理 可以解决乱序问题
object LoginFailWithCep {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream: DataStream[String] = env.readTextFile("D:\\Mywork\\workspace\\Project_idea\\UserBehaviorAnalysis0903\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv")
    val loginEventBykeyStream: KeyedStream[LoginEvent, Long] = inputStream.map { data =>
      val arr = data.split(",")
      LoginEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
    }.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
      override def extractTimestamp(element: LoginEvent): Long = element.timeStamp * 1000L
    }).keyBy(_.userId)

    // 1. 定义匹配的模式,要求是一个登录失败事件后,紧跟另一个登录失败事件
    val loginFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern
      .begin[LoginEvent]("firstFail").where(_.eventType == "fail")
      .next("secondFail").where(_.eventType == "fail")
      .within(Time.seconds(2))

    val patternStream = CEP.pattern(loginEventBykeyStream, loginFailPattern)

    val loginFailWarningStream = patternStream.select(new LoginFailEventMatch())
    loginFailWarningStream.print()
    env.execute("login fail with cep job")
  }
}

class LoginFailEventMatch() extends PatternSelectFunction[LoginEvent, LoginFailWarning] {
  //map(key, value)  key: firstFail, secondFail value:LoginEvent
  override def select(map: util.Map[String, util.List[LoginEvent]]): LoginFailWarning = {
    val firstFailEvent = map.get("firstFail").iterator().next()
    val secondFailEvent = map.get("secondFail").iterator().next()
    LoginFailWarning(firstFailEvent.userId, firstFailEvent.timeStamp, secondFailEvent.timeStamp, "login fail")
  }
}

//输出结果
LoginFailWarning(1035,1558430841,1558430842,login fail)
LoginFailWarning(1035,1558430843,1558430844,login fail)

标签:flink,val,LoginEvent,org,Flink,恶意,CEP,apache,import
来源: https://blog.csdn.net/qq_36213530/article/details/120111770