java-如何在Spark Streaming中映射kafka主题名称和相应记录
作者:互联网
我正在播放来自如下的kafka主题;
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicSet);
directKafkaStream.print();
对于一个主题,输出如下所示:
(null,"04/15/2015","18:44:14")
(null,"04/15/2015","18:44:15")
(null,"04/15/2015","18:44:16")
(null,"04/15/2015","18:44:17")
如何映射主题名称和记录.
例如:主题是“ callData”,它应该像下面这样,依此类推
(callData,"04/15/2015","18:44:14")
(callData,"04/15/2015","18:44:15")
(callData,"04/15/2015","18:44:16")
(callData,"04/15/2015","18:44:17")
解决方法:
How do I map topic name and records?
为了提取分区信息,请输入you’ll need to use the overload which accepts a Function
receiving MessageAndMetadata<K, V>
并返回要转换为的类型.
看起来像这样:
Map<TopicAndPartition, Long> map = new HashMap<>();
map.put(new TopicAndPartition("topicname", 0), 1L);
JavaInputDStream<Map.Entry> stream = KafkaUtils.createDirectStream(
javaContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
Map.Entry.class, // <--- This is the record return type from the transformation.
kafkaParams,
map,
messageAndMetadata ->
new AbstractMap.SimpleEntry<>(messageAndMetadata.topic(),
messageAndMetadata.message()));
注意我使用Map.Entry作为Scala中Tuple2的Java替代品.您可以提供自己的具有Partition和Message属性的类,并将其用于转换.注意,kafka输入流的类型现在为JavaInputDStream< Map.Entry> ;,因为这就是转换返回的内容.
标签:apache-spark,apache-kafka,java 来源: https://codeday.me/bug/20191118/2029323.html