数据库
首页 > 数据库> > 在Spark数据集中使用custome UDF withColumn; java.lang.String无法强制转换为org.apache.spark.sql.Row

在Spark数据集中使用custome UDF withColumn; java.lang.String无法强制转换为org.apache.spark.sql.Row

作者:互联网

我有一个包含许多字段的JSON文件.我在java中使用spark的Dataset读取文件.

> Spark版本2.2.0
> java jdk 1.8.0_121

下面是代码.

SparkSession spark = SparkSession
              .builder()
              .appName("Java Spark SQL basic example")
              .config("spark.some.config.option", "some-value")
              .master("local")
              .getOrCreate();

Dataset<Row> df = spark.read().json("jsonfile.json");

我想使用带有自定义UDF的withColumn函数来添加新列.

UDF1 someudf = new UDF1<Row,String>(){
        public String call(Row fin) throws Exception{
            String some_str = fin.getAs("String");
            return some_str;
        }
    };
spark.udf().register( "some_udf", someudf, DataTypes.StringType );
df.withColumn( "procs", callUDF( "some_udf", col("columnx") ) ).show();

运行上面的代码时出现转换错误.
java.lang.String无法强制转换为org.apache.spark.sql.Row

问题:

1 – 读取行数据集是唯一的选择吗?我可以将df转换为df的字符串.但我无法选择字段.

2 – 尝试但未能定义用户定义的数据类型.我无法使用此自定义UDDatatype注册UDF.我需要用户定义的数据类型吗?

3 – 和主要问题,我如何从String转换为Row?

部分日志复制如下:

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
    at Risks.readcsv$1.call(readcsv.java:1)
    at org.apache.spark.sql.UDFRegistration$$anonfun$27.apply(UDFRegistration.scala:512)
        ... 16 more

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$27: (string) => string)

对你的帮助表示感谢.

解决方法:

您正在获得该异常,因为UDF将在列的数据类型(不是Row)上执行.考虑我们有Dataset< Row>具有两列col1和col2的ds都是String类型.现在,如果我们想使用UDF将col2的值转换为大写.

我们可以注册并调用UDF,如下所示.

spark.udf().register("toUpper", toUpper, DataTypes.StringType);
ds.select(col("*"),callUDF("toUpper", col("col2"))).show();

或者使用withColumn

ds.withColumn("Upper",callUDF("toUpper", col("col2"))).show();

UDF应该如下所示.

private static UDF1 toUpper = new UDF1<String, String>() {
    public String call(final String str) throws Exception {
        return str.toUpperCase();
    }
};

标签:apache-spark-dataset,java,apache-spark,apache-spark-sql,user-defined-functions
来源: https://codeday.me/bug/20190823/1699259.html