编程语言
首页 > 编程语言> > 在Java中通过Spark DataFrame进行迭代而无需收集

在Java中通过Spark DataFrame进行迭代而无需收集

作者:互联网

我正在使用Spark 1.6.1

我有一个DataFrame,我需要对其进行遍历并将每一行写入Kafka.截至目前,我正在执行以下操作:

Producer<String><String> message;
for(Row x: my_df.collect()){
    kafka_message = new Producer<String><String>(topic, String.valueOf(x))
    my_kafka_producer.send(kafka_message);
}

这里的问题是,收集将数据发送到驱动程序,然后推送到kafka.鉴于我大约有250位执行者,我的1个驱动程序无法有效处理工作量.因此,我想知道如何在执行程序上遍历数据框.这将需要避免执行collect().我找到了一篇文章,粗略地解释了如何做到这一点,但是不幸的是,他们到GitHub的链接实际上已过期,所以我找不到实现它的方法.

文章供参考:
https://pythagoreanscript.wordpress.com/2015/05/28/iterate-through-a-spark-dataframe-using-its-partitions-in-java/comment-page-1/

解决方法:

在Java中,您可以尝试以下类似方法.扩展AbstractFunction1

import scala.runtime.AbstractFunction1;

abstract class MyFunction1<T,R> extends AbstractFunction1<T, R> implements Serializable {
}

现在,如下所示为您的数据框调用foreachPartition.

import scala.collection.Iterator;
import scala.runtime.BoxedUnit;

df.foreachPartition(new MyFunction1<Iterator<Row>,BoxedUnit>(){
        @Override
        public BoxedUnit apply(Iterator<Row> rows) {
            while(rows.hasNext()){
                //get the Row
                Row row = rows.next();
            }
            return BoxedUnit.UNIT;
        }
    });

标签:apache-spark,dataframe,loops,apache-kafka,java
来源: https://codeday.me/bug/20191111/2021194.html