编程语言
首页 > 编程语言> > java – 将字节数组发送到storm kafka bolt

java – 将字节数组发送到storm kafka bolt

作者:互联网

我写了一个风暴拓扑.我基本上想要以字节数组的形式将avro架构中的元组发送到kafka主题.

这就是我设置螺栓的方法:

  builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
            .fieldsGrouping(BOLT1, new Fields("key"));

这就是我转换为字节数组的方式

Schema schema = avroObject.getSchema();

        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(ping, encoder);
        encoder.flush();
        byte[] message = out.toByteArray();
        String key = new String(message, "UTF-8");

当我以下面的方式发出元组时,我在kafka主题中看不到任何内容(将字节流发送到kafka):

collector.emit(tuple, new Values(Obj.hashMD5(key), message));

但相反如果我将字节数组转换为字符串,然后转换为kafka主题它的工作原理:

如下所示:

 builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
            .fieldsGrouping(BOLT1, new Fields("key"));

collector.emit(tuple, new Values(Obj.hashMD5(key), key));

我究竟做错了什么?如何使用storm kafka bolt将字节流发送到kafka主题?

解决方法:

您有问题,因为您的MD5哈希不正确:

你说如果你将你的bytearray转换为java String它就可以了:这是因为根据一个String,MD5的值是正确的.

collector.emit(tuple, new Values(Obj.hashMD5(key), key));

正如您所看到的,MD5是在String参数上计算的,并且您发送了与MD5对应的String:一切都很好!

但是如果你发送一个bytearray,你需要在bytearray上计算MD5,结果它将是一个bytearray,而不是String.你的代码:

collector.emit(tuple, new Values(Obj.hashMD5(key), message));

是不正确的,因为MD5不对应于消息,而是UTF-8中转换后的消息值为有损的字符串(见下文).

以下是关于SO的另一个问题的链接,以正确的方式计算MD5:

How can I generate an MD5 hash?

这是因为在Java中将bytearray转换为String是有损的(与C相反),并且您将错过该过程中的数据,因为某些字节与Java编码中的char不对应(您的数据中显然有一些这些字符).

所以你的KafkaBolt应该是

KafkaBolt<byte[], byte[]>

我不知道在kafka storm中发送一个bytearray MD5和你的bytearray是否足够.如果不是,则必须使用在bytearray和java String之间无损的编码,例如BASE64:

Base64 Encoding in Java

您必须使用将bytearray转换为base64字符串

KafkaBolt<String, String>

然后像往常一样发送数据

collector.emit(tuple, new Values(Obj.hashMD5(keyInBase64), keyInBase64));

这也意味着当你从kafka获取数据时,它将是base64中的一个String,你必须解码才能获得bytearray.

希望有所帮助.

标签:java,apache-kafka,kafka-consumer-api,apache-storm
来源: https://codeday.me/bug/20190519/1138011.html