数据库
首页 > 数据库> > Google Dataflow(Apache Beam)将JdbcIO批量插入mysql数据库

Google Dataflow(Apache Beam)将JdbcIO批量插入mysql数据库

作者:互联网

我正在使用Dataflow SDK 2.X Java API(Apache Beam SDK)将数据写入mysql.我创建了基于Apache Beam SDK documentation的管道,以使用数据流将数据写入mysql.当我需要实现批量插入时,它会插入单行.我没有在官方文档中找到任何选项来启用批量插入模式.

想知道是否可以在数据流管道中设置批量插入模式?如果是,请告诉我以下代码中需要更改的内容.

 .apply(JdbcIO.<KV<Integer, String>>write()
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
            "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
          .withUsername("username")
          .withPassword("password"))
      .withStatement("insert into Person values(?, ?)")
      .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
        public void setParameters(KV<Integer, String> element, PreparedStatement query) {
          query.setInt(1, kv.getKey());
          query.setString(2, kv.getValue());
        }
      })

解决方法:

编辑2018-01-27:

事实证明,此问题与DirectRunner有关.如果使用DataflowRunner运行相同的管道,则应该获得实际上多达1,000条记录的批处理.分组操作后,DirectRunner始终创建大小为1的束.

原始答案:

使用Apache Beam的JdbcIO写入云数据库时,我遇到了同样的问题.问题是,尽管JdbcIO确实支持批量写入多达1,000条记录,但实际上我从未见过它一次写入多于1行(我必须承认:这始终在开发环境中使用DirectRunner).

因此,我为JdbcIO添加了一个功能,您可以通过将数据分组在一起并将每个组写为一个批处理来自己控制批处理的大小.以下是基于Apache Beam原始WordCount示例的如何使用此功能的示例.

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

与JdbcIO的常规写入方法的区别是采用PCollection< Iterable< RowT>的新方法writeIterable().作为输入而不是PCollection< RowT>.每个Iterable都批量写入数据库.

可以在以下位置找到具有此附加功能的JdbcIO版本:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java

包含上述示例的整个示例项目可以在这里找到:https://github.com/olavloite/spanner-beam-example

(Apache Beam上还有一个待处理的拉取请求,将其包含在项目中)

标签:apache-beam,apache-beam-io,mysql,google-cloud-dataflow
来源: https://codeday.me/bug/20191025/1929148.html