编程语言
首页 > 编程语言> > java-如何在Spark Streaming中映射kafka主题名称和相应记录

java-如何在Spark Streaming中映射kafka主题名称和相应记录

作者:互联网

我正在播放来自如下的kafka主题;

JavaPairInputDStream<String, String> directKafkaStream = 
    KafkaUtils.createDirectStream(jssc,
                                  String.class, 
                                  String.class,
                                  StringDecoder.class,
                                  StringDecoder.class,
                                  kafkaParams, 
                                  topicSet);

directKafkaStream.print();   

对于一个主题,输出如下所示:

(null,"04/15/2015","18:44:14")
(null,"04/15/2015","18:44:15")
(null,"04/15/2015","18:44:16")
(null,"04/15/2015","18:44:17")  

如何映射主题名称和记录.
例如:主题是“ callData”,它应该像下面这样,依此类推

(callData,"04/15/2015","18:44:14")
(callData,"04/15/2015","18:44:15")
(callData,"04/15/2015","18:44:16")
(callData,"04/15/2015","18:44:17")  

解决方法:

How do I map topic name and records?

为了提取分区信息,请输入you’ll need to use the overload which accepts a Function receiving MessageAndMetadata<K, V>并返回要转换为的类型.

看起来像这样:

Map<TopicAndPartition, Long> map = new HashMap<>();
map.put(new TopicAndPartition("topicname", 0), 1L);

JavaInputDStream<Map.Entry> stream = KafkaUtils.createDirectStream(
        javaContext,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        Map.Entry.class, // <--- This is the record return type from the transformation.
        kafkaParams,
        map,
        messageAndMetadata -> 
            new AbstractMap.SimpleEntry<>(messageAndMetadata.topic(),
                                          messageAndMetadata.message()));

注意我使用Map.Entry作为Scala中Tuple2的Java替代品.您可以提供自己的具有Partition和Message属性的类,并将其用于转换.注意,kafka输入流的类型现在为JavaInputDStream< Map.Entry&gt ;,因为这就是转换返回的内容.

标签:apache-spark,apache-kafka,java
来源: https://codeday.me/bug/20191118/2029323.html