编程语言
首页 > 编程语言> > Flink编程基本步骤和加载不同类型数据源

Flink编程基本步骤和加载不同类型数据源

作者:互联网

Flink编程基本步骤:

1.创建流执行环境 StreamExecutionEnviroment.getExecutionEnviroment() 获取流环境。

2.加载数据源 Source

3.转换操作 Transformation

4.输出出去Sink,落地到其它的数据仓库,直接打印输出.

关于Flink 数据的基本操作 —— 四种分类

  1. 单条数据的操作 map filter

  2. 多条数据的操作 window

  3. 多个流合并成一个流操作 connect union join

  4. 将一个流拆分成多个流操作 ,(split 过期),测输出流(OutputTag)output

Flink输入数据源 source

自带预定义Source

自定义Source

package cn.itcast.flink.api;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Date 2021/11/29 14:56
 * Desc - 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * public class Order
 * String oid;
 * int uid;
 * double money;
 * long timestamp;
 * String datetime;
 * 每一秒钟生成一条数据
 * 打印输出每条数据
 * 执行流环境
 */
public class OrderSource {
    public static void main(String[] args) throws Exception {
        //1.创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置并行度
        env.setParallelism(1);
        //3.获取自定义数据源
        //实现方式
        DataStreamSource<Order> source = env.addSource(new OrderEmitSource());
        //4.打印输出
        source.printToErr();
        //5.执行流环境
        env.execute();
    }

    public static class OrderEmitSource implements SourceFunction<Order> {
        //定义一个标记,用于标识当前持续生成数据
        private volatile boolean isRunning = true;

        /**
         * 实现数据的生成,并将生成的数据输出 ctx 输出
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            //定义随机数
            Random rm = new Random();
            //时间转换格式工具
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //死循环,一直生成数据
            while (isRunning) {
                //随机数
                String oid = UUID.randomUUID().toString();
                //用户id ,随机 0~5 之间值
                int uid = rm.nextInt(6);
                //money 0~100之间的
                double money = rm.nextDouble()*100;
                //时间戳
                long timestamp = System.currentTimeMillis();
                //当前时间
                String datetime = sdf.format(timestamp);
                Order order = new Order(
                        oid,
                        uid,
                        money,
                        timestamp,
                        datetime
                );
                //收集数据
                ctx.collect(order);
                //程序休眠一秒接着执行
                TimeUnit.SECONDS.sleep(1);
            }
        }

        /**
         * 用户取消生成的时候,取消生成
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String oid;
        private int uid;
        private double money;
        private long timestamp;
        private String datetime;
    }
}

 实现ParallelSourceFunction 接口案例

 并行化生成数据,算子上设置并行度 setParallelism(n)

package cn.itcast.flink.api;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Date 2021/11/29 14:56
 * Desc - 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * public class Order
 * String oid;
 * int uid;
 * double money;
 * long timestamp;
 * String datetime;
 * 每一秒钟生成一条数据
 * 打印输出每条数据
 * 执行流环境
 */
public class OrderParallelismSource {
    public static void main(String[] args) throws Exception {
        //1.创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置并行度
        env.setParallelism(1);
        //3.获取自定义数据源
        //实现方式
        DataStreamSource<Order> source = env.addSource(new OrderEmitSource()).setParallelism(6);
        //4.打印输出 
        source.printToErr();
        //5.执行流环境
        env.execute();
    }

    public static class OrderEmitSource implements ParallelSourceFunction<Order> {
        //定义一个标记,用于标识当前持续生成数据
        private volatile boolean isRunning = true;

        /**
         * 实现数据的生成,并将生成的数据输出 ctx 输出
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            //定义随机数
            Random rm = new Random();
            //时间转换格式工具
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //死循环,一直生成数据
            while (isRunning) {
                //随机数
                String oid = UUID.randomUUID().toString();
                //用户id ,随机 0~5 之间值
                int uid = rm.nextInt(6);
                //money 0~100之间的
                double money = rm.nextDouble()*100;
                //时间戳
                long timestamp = System.currentTimeMillis();
                //当前时间
                String datetime = sdf.format(timestamp);
                Order order = new Order(
                        oid,
                        uid,
                        money,
                        timestamp,
                        datetime
                );
                //收集数据
                ctx.collect(order);
                //程序休眠一秒接着执行
                TimeUnit.SECONDS.sleep(5);
            }
        }

        /**
         * 用户取消生成的时候,取消生成
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String oid;
        private int uid;
        private double money;
        private long timestamp;
        private String datetime;
    }
}

实现RichParallelSourceFunction案例

# 创建数据库
create database test;
# 使用数据库
use test;
# 创建表和导入数据
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (
  `id` int(11) NOT NULL,
  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');

SET FOREIGN_KEY_CHECKS = 1;

 2:Flink读取MySQL的数据源

package cn.itcast.flink.source;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Date 2022/1/11 16:17
 * Desc 读取mysql数据表并打印输出
 * 开发步骤:
 * 1.创建和准备数据库和数据表  flink
 * 2.获取流执行环境
 * 3.设置并行度
 * 4.添加自定义数据源,从mysql中读取数据,实现 RichSourceFunction ,rich 增强富功能 open close getRuntimeContext
 * 4.1. open 初始化动作,创建连接,创建 statement ,获取变量
 * 4.2. run方法 读取数据表中数据并封装成对象
 * 4.3. close方法 关闭statement和连接
 * 5. 打印结果输出
 * 6. 执行流环境
 */
public class UserSource {
    public static void main(String[] args) throws Exception {
        //2.获取流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //3.设置并行度
        env.setParallelism(1);
        //4.添加自定义数据源,从mysql中读取数据,实现 RichSourceFunction ,rich 增强富功能 open close getRuntimeContext
        DataStreamSource<User> source = env.addSource(new RichSourceFunction<User>() {
            Connection conn = null;
            Statement statement = null;
            //标记
            boolean isRunning = true;

            /**
             * 在所有执行source,首先要做的初始化工作
             * @param parameters
             * @throws Exception
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                //1.设置 driver 驱动
                Class.forName("com.mysql.jdbc.Driver");
                //2.获取连接 设置 url 用户名 密码
                conn = DriverManager.getConnection(
                        "jdbc:mysql://node1:3306/flink?useSSL=false",
                        "root",
                        "123456"
                );
                //3.创建 statement 基于 sql
                statement = conn.createStatement();
            }

            /**
             * 所有的元素都在这里执行
             * @param ctx
             * @throws Exception
             */
            @Override
            public void run(SourceContext<User> ctx) throws Exception {
                String sql = "select id,username,password,name from user";
                while (isRunning) {
                    //1.读取数据 statement.executeQuery 得到 ResultSet 结果集
                    ResultSet rs = statement.executeQuery(sql);
                    //2.遍历 ResultSet 是否有数据 hasNext() = true
                    while (rs.next()) {
                        User user = new User();
                        //3.将每条数据 赋值 对象 User
                        int id = rs.getInt("id");
                        String username = rs.getString("username");
                        String password = rs.getString("password");
                        String name = rs.getString("name");

                        user.setId(id);
                        user.setUsername(username);
                        user.setPassword(password);
                        user.setName(name);
                        //4.将 User 收集 ctx.collect(user)
                        ctx.collect(user);
                    }
                    TimeUnit.MINUTES.sleep(5);
                }
            }

            @Override
            public void cancel() {
                //将flag置为 false
                isRunning = false;
            }

            /**
             * 所有的元素执行完毕的收尾工作
             * @throws Exception
             */
            @Override
            public void close() throws Exception {
                //关闭 statement
                if (!statement.isClosed()) {
                    statement.close();
                }
                //关闭 connection
                if (!conn.isClosed()) {
                    conn.close();
                }
            }
        });
        //4.1. open 初始化动作,创建连接,创建 statement ,获取变量
        //4.2. run方法 读取数据表中数据并封装成对象
        //4.3. close方法 关闭statement和连接
        //5. 打印结果输出
        source.printToErr();
        //6. 执行流环境
        env.execute();
    }

    public static class User {
        // id
        private int id;
        // username
        private String username;
        // password
        private String password;
        // name
        private String name;

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getUsername() {
            return username;
        }

        public void setUsername(String username) {
            this.username = username;
        }

        public String getPassword() {
            return password;
        }

        public void setPassword(String password) {
            this.password = password;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    ", password='" + password + '\'' +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
}

标签:flink,String,数据源,Flink,private,并行度,import,public,加载
来源: https://blog.csdn.net/weixin_53150969/article/details/122707597