在Java中通过Spark DataFrame进行迭代而无需收集
作者:互联网
我正在使用Spark 1.6.1
我有一个DataFrame,我需要对其进行遍历并将每一行写入Kafka.截至目前,我正在执行以下操作:
Producer<String><String> message;
for(Row x: my_df.collect()){
kafka_message = new Producer<String><String>(topic, String.valueOf(x))
my_kafka_producer.send(kafka_message);
}
这里的问题是,收集将数据发送到驱动程序,然后推送到kafka.鉴于我大约有250位执行者,我的1个驱动程序无法有效处理工作量.因此,我想知道如何在执行程序上遍历数据框.这将需要避免执行collect().我找到了一篇文章,粗略地解释了如何做到这一点,但是不幸的是,他们到GitHub的链接实际上已过期,所以我找不到实现它的方法.
解决方法:
在Java中,您可以尝试以下类似方法.扩展AbstractFunction1
import scala.runtime.AbstractFunction1;
abstract class MyFunction1<T,R> extends AbstractFunction1<T, R> implements Serializable {
}
现在,如下所示为您的数据框调用foreachPartition.
import scala.collection.Iterator;
import scala.runtime.BoxedUnit;
df.foreachPartition(new MyFunction1<Iterator<Row>,BoxedUnit>(){
@Override
public BoxedUnit apply(Iterator<Row> rows) {
while(rows.hasNext()){
//get the Row
Row row = rows.next();
}
return BoxedUnit.UNIT;
}
});
标签:apache-spark,dataframe,loops,apache-kafka,java 来源: https://codeday.me/bug/20191111/2021194.html