编程语言
首页 > 编程语言> > java – KafkaStreams serde异常

java – KafkaStreams serde异常

作者:互联网

我正在玩Kafka和溪流技术;我已经为KStream创建了一个自定义序列化器和反序列化器,我将用它来接收来自给定主题的消息.

现在,问题是我正在以这种方式创建一个serde:

JsonSerializer<EventMessage> serializer = new JsonSerializer<>();
JsonDeserializer<EventMessage> deserializer = new JsonDeserializer<>(EventMessage.class);
Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);

串行器实现:

public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson = new Gson();

    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        return gson.toJson(data).getBytes(Charset.forName("UTF-8"));
    }

    @Override
    public void close() {

    }
}  

反序列化器实现:

public class JsonDeserializer<T> implements Deserializer<T> {

    private Gson gson = new Gson();
    private Class<T> deserializedClass;

    public JsonDeserializer() {

    }

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String, ?> map, boolean b) {
        if(deserializedClass == null) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        System.out.print(data);
        if(data == null){
            return null;
        }

        return gson.fromJson(new String(data),deserializedClass);
    }

    @Override
    public void close() {

    }
}

当我尝试执行代码时,我收到以下错误:

Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.apache.kafka.common.serialization.Serdes$WrapperSerde Does it have a public no-argument constructor?

完全转储:https://pastebin.com/WwpuXuxB

这是我尝试使用serde的方式:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(stringSerde, messageSerde, topic);

KStream<String, EventMessage> outStream = eventsStream
            .mapValues(value -> EventMessage.build(value.type, value.timestamp));

outStream.to("output");

此外,我不完全确定我正确设置了全局设置序列化器和反序列化器的属性:

streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, messageSerde.getClass());

解决方法:

另一种方法是使用StreamsBuilder而不是KStreamBuilder. KStreamBuilder在1.0.0中已弃用.您可以在创建流时使用Consumed.with直接传递serde对象.您无需在此方案中创建自定义Serde类.

Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);

StreamsBuilder builder = new StreamsBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(topic, Consumed.with(Serdes.String(), messageSerde));

您可以将StringSerde保留在下面的代码中,而不是使用失败的messageSerde.getClass(),因为messageSerde只是一个没有非参数构造函数的WrappedSerde.

streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName());

标签:apache-kafka-streams,java,apache-kafka
来源: https://codeday.me/bug/20190724/1522957.html