Kafka
作者:互联网
1、命令:
启动zookeeper:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
启动kafka:
bin\windows\kafka-server-start.bat config\server.properties
创建topic:
bin\windows\kafka-topics.bat --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic test
或
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
创建生产者:
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
创建消费者:
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
展示主题信息(副本,ISR等)
bin\windows\kafka-topics.bat --zookeeper localhost:2181 --describe --topic test
查看所有主题
bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list
2、生产者操作
重要参数:
buffer.memory //RecordAccumulator消息收集器的缓存大小,默认32MB。超过会抛出异常或阻塞,取决于max.block.ms参数
max.block.ms //缓存溢出后,阻塞时间。默认60秒。
retries //重发次数,
vetry.backoff.ms //重发间隔时间
batch.size //缓存参数,决定创建ProducterBatch的大小,关于ProducterBatch在P38。
acks //1:leader收到消息即为成功,0:生产者发送消息后不等待任何相应 ,-1/all:leader和follwer全部都写入消息才为成功
(1)、生产者添加消息:
public class Kafka { private static KafkaProducer<String,String> producer; public static void main(String[] args) { Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("acks", "all"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String,String>(kafkaProps); // consumer=new KafkaConsumer<>(kafkaProps); ProducerRecord<String, String> record = new ProducerRecord<>("test", "444"); try { System.out.println(producer.send(record).get()); List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ; partitions = producer.partitionsFor("test"); for(PartitionInfo p:partitions) { System.out.println(p); } }catch (Exception e) { e.printStackTrace(); } } }
(2)、生产者回调函数
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata arg0, Exception arg1) { System.out.println("回调函数!" + arg0.topic() + " " + arg1); } });
注:当执行成功时回调函数的Exception是null.
(3)、生产者拦截器
第一步、创建拦截器类
public class MyInterceptor implements ProducerInterceptor<String, String>{ @Override public void configure(Map<String, ?> arg0) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } @Override public void onAcknowledgement(RecordMetadata arg0, Exception arg1) { // TODO Auto-generated method stub System.out.println(""); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> arg0) { System.out.println("现在是拦截器"+arg0.topic()); if(arg0.value().contains("wgy")) { return new ProducerRecord<String,String>(arg0.topic(),arg0.partition(),arg0.timestamp(),arg0.key(),"we is good"); } return arg0; } }
第二步、在生产者配置中配置
kafkaProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyInterceptor.class.getName());
可以配置多个拦截器,对个拦截器之间使用逗号隔开,例如:
kafkaProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyInterceptor.class.getName()+","+MyInterceptor.class.getName() );
消费者操作
重要参数
(1)、消费者读取消息:
public class KafkaRead { private static KafkaConsumer<String,String> consumer; public static void main(String[] args) { Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("group.id", "test"); kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer=new KafkaConsumer<>(kafkaProps); consumer.subscribe(java.util.Collections.singletonList("test")); try { while(true) { ConsumerRecords<String, String> records=consumer.poll(100); for(ConsumerRecord<String, String> record:records) { System.out.println(record.toString()); } } }catch (Exception e) { // TODO: handle exception } } }
(5)、
标签:--,kafka,arg0,kafkaProps,put,Kafka,public 来源: https://www.cnblogs.com/wanglala/p/16062480.html