其他分享
首页 > 其他分享> > Flink Sink:接收器

Flink Sink:接收器

作者:互联网

flink代码分为三部分:

1、Source----数据源,读取数据

2、Transformation----转换,对数据进行处理,也就是算子

3、Sink----将数据发出去

Flink 将转换计算后的数据发送的地点 。
Flink 常见的 Sink 大概有如下几类:

1、写入文件
2、打印出来
3、写入 socket
4、自定义的 sink 。

自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。

1、写入文件

这个东西不要硬记,要学会看官网

package com.shujia.flink.sink

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object Demo1FileSink {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    //读取数据
    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    /**
      * 写入文件
      */
      
    val sink: StreamingFileSink[String] = StreamingFileSink
      //指定保存路径和数据的编码格式
      .forRowFormat(new Path("data/flink/out"), new SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          //滚动生成文件的策略
          .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
          //至少包含15分钟的数据
          .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
          //最近5分钟没有收到新的记录
          .withMaxPartSize(1024 * 1024 * 1024)//文件大小达到1G(写入最后一条记录之后)
          .build())
      .build()

    linesDS.addSink(sink)

    env.execute()
  }
}

//执行结果会生成一个out目录,该目录下会生成结果的文件

4、自定义的 sink

自定义sink就是实现SinkFunction

package com.shujia.flink.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object Demo2SinkFunction {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val studentDS: DataStream[String] = env.readTextFile("data/students.txt")

    //使用自定义的sink
    studentDS.addSink(new MysqlSink)

    env.execute()
  }
}

/**
  * 自定义sink
  * SinkFunction : 普通sink
  * RichSinkFunction: 多了 open 和 close 方法
  */

class MysqlSink extends RichSinkFunction[String] {
  /**
    * open 在invoke之前执行,每一个task中只执行一次
    * 所以使用RichSinkFunction,
    * RichSinkFunction 比 SinkFunction多了open()、close()方法
    * 可以将加载驱动、创建链接放入open()内
    */ 
  var con: Connection = _

  override def open(parameters: Configuration): Unit = {
    println("open")
    //1、加载驱动
    Class.forName("com.mysql.jdbc.Driver")
    //创建链接
    //?useUnicode=true&characterEncoding=utf-8 -- 写数据了所以要指定编码格式
    con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=utf-8", "root", "123456")
  }

   /**
    * close 在invoke之后执行,每一个task中只 _?_ 执行
    */
  override def close(): Unit = {
    println("close")
    con.close()
  }

  /**
    * invoke:每一条数据执行一次
    * @param stu     : 一条数据
    * @param context : 上下文对象
    */
    
  override def invoke(stu: String, context: SinkFunction.Context[_]): Unit = {

    //切分数据
    val split: Array[String] = stu.split(",")
    
    //创建PreparedStatement
    val stat: PreparedStatement = con.prepareStatement("insert into student (id,name,age,gender,clazz) values(?,?,?,?,?)")

    //设置参数
    stat.setString(1, split(0))
    stat.setString(2, split(1))
    stat.setInt(3, split(2).toInt)
    stat.setString(4, split(3))
    stat.setString(5, split(4))

    //执行插入
    stat.execute()

  }
}

自定义 sink ,统计单词的数量,并把结果写入MySQL中

replace 如果主键不存在就插入,如果主键存在就替换,和 insert 用法相同

package com.shujia.flink.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object Demo3WcSInkMysql {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val countDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)

    countDS.addSink(new RichSinkFunction[(String, Int)] {

      var con: Connection = _

      override def open(parameters: Configuration): Unit = {
        //1、加载驱动
        Class.forName("com.mysql.jdbc.Driver")
        //创建链接
        con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=utf-8", "root", "123456")
      }

      override def close(): Unit = con.close()

      override def invoke(kv: (String, Int), context: SinkFunction.Context[_]): Unit = {
        //replace 如果主键不存在就插入,如果主键存在就替换
        val stat: PreparedStatement = con.prepareStatement("replace into word_count(word,count) values(?,?)")

        stat.setString(1, kv._1)
        stat.setInt(2, kv._2)

        stat.execute()

      }
    })

    env.execute()
  }
}

标签:接收器,Flink,String,flink,Sink,org,apache,import,sink
来源: https://www.cnblogs.com/saowei/p/16029685.html