其他分享
首页 > 其他分享> > Flink 自定义Source 并行度问题

Flink 自定义Source 并行度问题

作者:互联网

实现 SourceFunction

大多数情况下,Flink自身支持的数据源已经能够满足需要。但是凡事总有例外,如果遇到特殊情况,想要读取的数据源来自某个外部系统,而flink既没有预实现的方法、也没有提供连接器,那就只好自定义实现SourceFunction了。接下来创建一个自定义的数据源,实现SourceFunction接口。主要重写两个关键方法:run()和cancel()。

run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。

下面就实现一下:

    public static class MySource0625 implements SourceFunction<Event> {
        //是否执行
        private boolean ifRun = true;

        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            Random random = new Random();
            String[] users = {"令狐冲", "依琳", "莫大", "风清扬", "任盈盈", "林远图", "定仪"};
            String[] urls = {"/home", "/cart", "/pay", "/info?id?"};
            while (ifRun) {
                //直接返回一个对象
                ctx.collect(new Event(users[random.nextInt(urls.length)],
                        urls[random.nextInt(urls.length)],
                        Calendar.getInstance().getTimeInMillis()));
                //1s  生成一条
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() {
            //取消
            ifRun = false;
        }
    }

调用测试:

 env.addSource(new MySource0625()).print();

输出结果

3> Event{user='令狐冲', url='/home', timestamp=2022-06-25 08:09:36.577}
4> Event{user='令狐冲', url='/cart', timestamp=2022-06-25 08:09:37.588}
1> Event{user='令狐冲', url='/info?id?', timestamp=2022-06-25 08:09:38.592}
2> Event{user='风清扬', url='/cart', timestamp=2022-06-25 08:09:39.595}
3> Event{user='依琳', url='/home', timestamp=2022-06-25 08:09:40.598}

下面修改下并行度再次测试

 env.addSource(new MySource0625()).setParallelism(2).print();

报错

Exception in thread "main" java.lang.IllegalArgumentException: The parallelism of non parallel operator must be 1.
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
    at org.apache.flink.api.common.operators.util.OperatorValidationUtils.validateParallelism(OperatorValidationUtils.java:35)
    at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:114)
    at org.wdh01.chapter05ForTest.SourceTest0625.main(SourceTest0625.java:51)

原因是:SourceFunction接口定义的数据源,并行度只能设置为1;SourceFunction接口定义的数据源,并行度只能设置为1;SourceFunction接口定义的数据源,并行度只能设置为1

所以如果我们想要自定义并行的数据源的话,需要使用ParallelSourceFunction

ParallelSourceFunction

    public static class MyParaiSource implements ParallelSourceFunction<Event> {

        private boolean ifRun = true;

        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            String[] users = {"令狐冲", "依琳", "莫大", "风清扬", "任盈盈", "林远图", "定仪"};
            String[] urls = {"/home", "/cart", "/pay", "/info?id?"};
            Random random = new Random();
            while (ifRun) {
                //直接返回一个对象
                ctx.collect(new Event(users[random.nextInt(users.length)],
                        urls[random.nextInt(urls.length)],
                        Calendar.getInstance().getTimeInMillis()));
                //1s  生成一条
                Thread.sleep(1000);
            }

        }


        @Override
        public void cancel() {
            //取消
            ifRun = false;
        }
    }

测试

 env.addSource(new MyParaiSource()).setParallelism(2).print();

输出

3> Event{user='任盈盈', url='/home', timestamp=2022-06-25 08:15:07.252}
2> Event{user='任盈盈', url='/pay', timestamp=2022-06-25 08:15:07.248}
4> Event{user='莫大', url='/pay', timestamp=2022-06-25 08:15:08.259}
3> Event{user='莫大', url='/home', timestamp=2022-06-25 08:15:08.259}
1> Event{user='依琳', url='/info?id?', timestamp=2022-06-25 08:15:09.262}
4> Event{user='任盈盈', url='/pay', timestamp=2022-06-25 08:15:09.262}
2> Event{user='莫大', url='/info?id?', timestamp=2022-06-25 08:15:10.265}
1> Event{user='令狐冲', url='/cart', timestamp=2022-06-25 08:15:10.265}
3> Event{user='林远图', url='/pay', timestamp=2022-06-25 08:15:11.268}
2> Event{user='林远图', url='/pay', timestamp=2022-06-25 08:15:11.268}

可以看到:实现了 ParallelSourceFunction 的自定义 Source ,是可以并行的,没有了 SourceFunction 的并行度限制,工作中可以根据业务需要灵活选择。

标签:25,06,自定义,url,timestamp,并行度,Source,2022,Event
来源: https://www.cnblogs.com/wdh01/p/16410710.html