其他分享
首页 > 其他分享> > librdkafka-example-modeC

librdkafka-example-modeC

作者:互联网

/*结构体说明  
rd_kafka_toppar_s:topic & partition combination

*/
else if (mode == 'C') { /* * Consumer */ //初始化设置 rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL, 0); /* Create Kafka handle */
//判断是否可以使用当前consumer
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr); exit(1); } //当设置-s:watermark使用 if (get_wmarks) { int64_t lo, hi; /* Only query for hi&lo partition watermarks */ if ((err = rd_kafka_query_watermark_offsets( rk, topic, partition, &lo, &hi, 5000))) { fprintf(stderr, "%% query_watermark_offsets() " "failed: %s\n", rd_kafka_err2str(err)); exit(1); } printf( "%s [%d]: low - high offsets: " "%" PRId64 " - %" PRId64 "\n", topic, partition, lo, hi); rd_kafka_destroy(rk); exit(0); } /* Create topic */ rkt = rd_kafka_topic_new(rk, topic, topic_conf); topic_conf = NULL; /* Now owned by topic */ /* Start consuming */
//判断是否可以消费
/*rd_kafka_consume_start
判断条件:
1.topic是否长度超限 是否为空
2.加线程锁
3.判断offset(最新,最小,上次消费的地方) 判断offset是否合法 */
if (rd_kafka_consume_start(rkt, partition, start_offset) == -1) { err = rd_kafka_last_error(); fprintf(stderr, "%% Failed to start consuming: %s\n", rd_kafka_err2str(err)); if (err == RD_KAFKA_RESP_ERR__INVALID_ARG) fprintf(stderr, "%% Broker based offset storage " "requires a group.id, " "add: -X group.id=yourGroup\n"); exit(1); } /*

*/ while (run) { rd_kafka_message_t *rkmessage; /* Poll for errors, etc. */ rd_kafka_poll(rk, 0); /* Consume single message. * See rdkafka_performance.c for high speed * consuming of messages. */ rkmessage = rd_kafka_consume(rkt, partition, 1000); if (!rkmessage) /* timeout */ continue; msg_consume(rkmessage, NULL); /* Return message to rdkafka */ rd_kafka_message_destroy(rkmessage); if (seek_offset) { err = rd_kafka_seek(rkt, partition, seek_offset, 2000); if (err) printf("Seek failed: %s\n", rd_kafka_err2str(err)); else printf("Seeked to %" PRId64 "\n", seek_offset); seek_offset = 0; } } /* Stop consuming */ rd_kafka_consume_stop(rkt, partition); while (rd_kafka_outq_len(rk) > 0) rd_kafka_poll(rk, 10); /* Destroy topic */ rd_kafka_topic_destroy(rkt); /* Destroy handle */ rd_kafka_destroy(rk); }



标签:topic,partition,librdkafka,kafka,rd,modeC,offset,example,rk
来源: https://www.cnblogs.com/supermanwx/p/16034906.html