kafka 拦截器
作者:互联网
kafka 拦截器
拦截器:允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。
Spring MVC 拦截器视图:
kafka 拦截器:
Kafka 拦截器分为生产者拦截器和消费者拦截器。
-
生产者拦截器:允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;
-
消费者拦截器:支持在消费消息前以及提交位移后编写特定逻辑。
这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。
生产者端-拦截器
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……
Producer 端拦截器实现类都要实现 org.apache.kafka.clients.producer.ProducerInterceptor接口,该接口提供了两个核心方法:
-
onSend:该方法会在消息发送之前被调用。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会。
-
onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。还记得我在上一期中提到的发送回调通知 callback 吗?onAcknowledgement 的调用要早于 callback 的调用。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。
消费者端-拦截器
Consumer 端拦截器实现类都要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,该接口提供两个核心方法:
- onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你
- onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
使用场景:
Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。
标签:逻辑,拦截器,interceptors,kafka,调用,方法 来源: https://www.cnblogs.com/shix0909/p/16579550.html