其他分享
首页 > 其他分享> > Kafka

Kafka

作者:互联网

1、命令:

 启动zookeeper:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

   启动kafka:

bin\windows\kafka-server-start.bat config\server.properties

   创建topic:

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic test

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

   创建生产者:

bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

   创建消费者:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

   展示主题信息(副本,ISR等)

bin\windows\kafka-topics.bat  --zookeeper localhost:2181 --describe --topic test

 

  查看所有主题

bin\windows\kafka-topics.bat  --zookeeper localhost:2181 --list

 

2、生产者操作

重要参数:

buffer.memory  //RecordAccumulator消息收集器的缓存大小,默认32MB。超过会抛出异常或阻塞,取决于max.block.ms参数

max.block.ms  //缓存溢出后,阻塞时间。默认60秒。

retries     //重发次数,

vetry.backoff.ms  //重发间隔时间

batch.size   //缓存参数,决定创建ProducterBatch的大小,关于ProducterBatch在P38。

acks      //1:leader收到消息即为成功,0:生产者发送消息后不等待任何相应 ,-1/all:leader和follwer全部都写入消息才为成功

 

 

   (1)、生产者添加消息:

复制代码
public class Kafka {
    private static KafkaProducer<String,String> producer;
    public static void main(String[] args) {
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("acks", "all");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new  KafkaProducer<String,String>(kafkaProps);
//        consumer=new KafkaConsumer<>(kafkaProps);
        ProducerRecord<String, String> record = new ProducerRecord<>("test", "444");
        try {
            System.out.println(producer.send(record).get());
            List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ;
            partitions = producer.partitionsFor("test");
            for(PartitionInfo p:partitions)
            {
                System.out.println(p);
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

 

 

  (2)、生产者回调函数

复制代码
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata arg0, Exception arg1) {
                        System.out.println("回调函数!" + arg0.topic() + "   " + arg1);
                    }
                });
复制代码

 

 注:当执行成功时回调函数的Exception是null.

 

 (3)、生产者拦截器

 第一步、创建拦截器类

复制代码
public class MyInterceptor implements ProducerInterceptor<String, String>{
    @Override
    public void configure(Map<String, ?> arg0) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void onAcknowledgement(RecordMetadata arg0, Exception arg1) {
        // TODO Auto-generated method stub
        System.out.println("");
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> arg0) {
        System.out.println("现在是拦截器"+arg0.topic());
        if(arg0.value().contains("wgy")) {
            return new ProducerRecord<String,String>(arg0.topic(),arg0.partition(),arg0.timestamp(),arg0.key(),"we is good");
        }
        return arg0;
    }
}
复制代码

 

 第二步、在生产者配置中配置

kafkaProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyInterceptor.class.getName());

 

可以配置多个拦截器,对个拦截器之间使用逗号隔开,例如:

        kafkaProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                MyInterceptor.class.getName()+","+MyInterceptor.class.getName()
                );

 

 

 消费者操作

重要参数

 

 

   (1)、消费者读取消息:

复制代码
public class KafkaRead {
    private static KafkaConsumer<String,String> consumer;
    public static void main(String[] args) {
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("group.id", "test");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer=new KafkaConsumer<>(kafkaProps);
        consumer.subscribe(java.util.Collections.singletonList("test"));
        try {
            while(true) {
                ConsumerRecords<String, String> records=consumer.poll(100);

                for(ConsumerRecord<String, String> record:records) {
                    System.out.println(record.toString());
                }
            }
        }catch (Exception e) {
            // TODO: handle exception
        }
    }
}
复制代码

 

(5)、

标签:--,kafka,arg0,kafkaProps,put,Kafka,public
来源: https://www.cnblogs.com/wanglala/p/16062480.html