其他分享
首页 > 其他分享> > Kafka高级API和低级API

Kafka高级API和低级API

作者:互联网

 

Kafka消费过程分析

kafka提供了两套consumer API:高级Consumer API和低级API。

高级API

1)高级API优点

高级API 写起来简单

不需要去自行去管理offset,系统通过zookeeper自行管理

不需要管理分区,副本等情况,系统自动管理

消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的的offset)

可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)

2)高级API缺点

不能自行控制offset(对于某些特殊需求来说)

不能细化控制如分区、副本、zk等

低级API

1)低级 API 优点

能够开发者自己控制offset,想从哪里读取就从哪里读取。

自行控制连接分区,对分区自定义进行负载均衡

对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中)

2)低级API缺点

太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。

高级producer

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

package com.sinoiov.kafka.test;

 

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import kafka.serializer.StringEncoder;

 

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Properties;

 

/**

 * Created by caoyu on 16/4/21.

 * By 中交兴路 大数据中心-基础平台部

 */

public class Kafka_produce extends Thread{

    private String topic;

    private SimpleDateFormat sdf = new SimpleDateFormat("MM-dd hh:mm:ss");

 

    public Kafka_produce(String topic){

        super();

        this.topic = topic;

    }

 

    @Override

    public void run() {

        Producer<String, String> producer = createProducer();

        long i = 0;

        while(true){

            i++;

            long now = System.currentTimeMillis();

            KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic,sdf.format(new Date(now))+"_"+i+"");

            producer.send(message);

            try {

                Thread.sleep(1000);

            catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

 

    private Producer<String,String> createProducer(){

        Properties properties = new Properties();

        properties.put("metadata.broker.list","192.168.110.81:9092,192.168.110.82:9092,192.168.110.83:9092");

        properties.put("serializer.class", StringEncoder.class.getName());

        properties.put("zookeeper.connect""nnn1:2181,nnn2:2181,nslave1:2181");

        return new Producer<String, String>(new ProducerConfig(properties));

    }

}

高级consumer

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

package com.sinoiov.kafka.test;

 

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

 

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

 

/**

 * Created by caoyu on 16/4/21.

 * By 中交兴路 大数据中心-基础平台部

 */

public class Kafka_consumer extends Thread {

    private String topic;

    private ConsumerConnector consumer;

 

    public Kafka_consumer(String topic){

        super();

        this.topic = topic;

        consumer = createConsumer();

    }

 

    public void shutDown(){

        if(consumer != null)

            consumer.shutdown();

    }

 

    @Override

    public void run() {

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> messageSteam = consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> steam = messageSteam.get(topic).get(0);

        ConsumerIterator<byte[], byte[]> iterator = steam.iterator();

        while(iterator.hasNext()){

            String message = new String(iterator.next().message());

            System.out.println(message);

        }

    }

 

    private ConsumerConnector createConsumer(){

        Properties properties = new Properties();

        properties.put("zookeeper.connect","nnn1:2181,nnn2:2181,nslave1:2181");

        properties.put("group.id""testsecond");

        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

    }

}

同样,代码也不复杂,基本都能参考看懂。

高级 API 的特点

优点

缺点

低级 API 的特点

优点

缺点

 

低级 API 示例代码

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

package com.sinoiov.kafka.test;

 

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

 

import kafka.api.FetchRequest;

import kafka.api.FetchRequestBuilder;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.cluster.BrokerEndPoint;

import kafka.common.ErrorMapping;

import kafka.common.TopicAndPartition;

import kafka.javaapi.FetchResponse;

import kafka.javaapi.OffsetResponse;

import kafka.javaapi.PartitionMetadata;

import kafka.javaapi.TopicMetadata;

import kafka.javaapi.TopicMetadataRequest;

import kafka.javaapi.consumer.SimpleConsumer;

import kafka.message.MessageAndOffset;

 

/**

 * Created by caoyu on 16/4/26.

 * By 中交兴路 大数据中心-基础平台部

 */

public class SimpleExample {

    private List<String> m_replicaBrokers = new ArrayList<String>();

 

    public SimpleExample() {

        m_replicaBrokers = new ArrayList<String>();

    }

 

    public static void main(String args[]) {

        SimpleExample example = new SimpleExample();

        // 最大读取消息数量

        long maxReads = Long.parseLong("3");

        // 要订阅的topic

        String topic = "test1";

        // 要查找的分区

        int partition = Integer.parseInt("0");

        // broker节点的ip

        List<String> seeds = new ArrayList<String>();

        seeds.add("192.168.110.81");

        seeds.add("192.168.110.82");

        seeds.add("192.168.110.83");

        // 端口

        int port = Integer.parseInt("9092");

        try {

            example.run(maxReads, topic, partition, seeds, port);

        catch (Exception e) {

            System.out.println("Oops:" + e);

            e.printStackTrace();

        }

    }

 

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {

        // 获取指定Topic partition的元数据

        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);

        if (metadata == null) {

            System.out.println("Can't find metadata for Topic and Partition. Exiting");

            return;

        }

        if (metadata.leader() == null) {

            System.out.println("Can't find Leader for Topic and Partition. Exiting");

            return;

        }

        String leadBroker = metadata.leader().host();

        String clientName = "Client_" + a_topic + "_" + a_partition;

 

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 10000064 1024, clientName);

        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

        int numErrors = 0;

        while (a_maxReads > 0) {

            if (consumer == null) {

                consumer = new SimpleConsumer(leadBroker, a_port, 10000064 1024, clientName);

            }

            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();

            FetchResponse fetchResponse = consumer.fetch(req);

 

            if (fetchResponse.hasError()) {

                numErrors++;

                // Something went wrong!

                short code = fetchResponse.errorCode(a_topic, a_partition);

                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);

                if (numErrors > 5)

                    break;

                if (code == ErrorMapping.OffsetOutOfRangeCode()) {

                    // We asked for an invalid offset. For simple case ask for

                    // the last element to reset

                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);

                    continue;

                }

                consumer.close();

                consumer = null;

                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);

                continue;

            }

            numErrors = 0;

 

            long numRead = 0;

            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {

                long currentOffset = messageAndOffset.offset();

                if (currentOffset < readOffset) {

                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);

                    continue;

                }

 

                readOffset = messageAndOffset.nextOffset();

                ByteBuffer payload = messageAndOffset.message().payload();

 

                byte[] bytes = new byte[payload.limit()];

                payload.get(bytes);

                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " new String(bytes, "UTF-8"));

                numRead++;

                a_maxReads--;

            }

 

            if (numRead == 0) {

                try {

                    Thread.sleep(1000);

                catch (InterruptedException ie) {

                }

            }

        }

        if (consumer != null)

            consumer.close();

    }

 

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {

        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);

        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);

        OffsetResponse response = consumer.getOffsetsBefore(request);

 

        if (response.hasError()) {

            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));

            return 0;

        }

        long[] offsets = response.offsets(topic, partition);

        return offsets[0];

    }

 

    /**

     * @param a_oldLeader

     * @param a_topic

     * @param a_partition

     * @param a_port

     * @return String

     * @throws Exception

     *             找一个leader broker

     */

    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {

        for (int i = 0; i < 3; i++) {

            boolean goToSleep = false;

            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);

            if (metadata == null) {

                goToSleep = true;

            else if (metadata.leader() == null) {

                goToSleep = true;

            else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {

                // first time through if the leader hasn't changed give

                // ZooKeeper a second to recover

                // second time, assume the broker did recover before failover,

                // or it was a non-Broker issue

                //

                goToSleep = true;

            else {

                return metadata.leader().host();

            }

            if (goToSleep) {

                try {

                    Thread.sleep(1000);

                catch (InterruptedException ie) {

                }

            }

        }

        System.out.println("Unable to find new leader after Broker failure. Exiting");

        throw new Exception("Unable to find new leader after Broker failure. Exiting");

    }

 

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {

        PartitionMetadata returnMetaData = null;

        loop: for (String seed : a_seedBrokers) {

            SimpleConsumer consumer = null;

            try {

                consumer = new SimpleConsumer(seed, a_port, 10000064 1024"leaderLookup");

                List<String> topics = Collections.singletonList(a_topic);

                TopicMetadataRequest req = new TopicMetadataRequest(topics);

                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

 

                List<TopicMetadata> metaData = resp.topicsMetadata();

                for (TopicMetadata item : metaData) {

                    for (PartitionMetadata part : item.partitionsMetadata()) {

                        if (part.partitionId() == a_partition) {

                            returnMetaData = part;

                            break loop;

                        }

                    }

                }

            catch (Exception e) {

                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);

            finally {

                if (consumer != null)

                    consumer.close();

            }

        }

        if (returnMetaData != null) {

            m_replicaBrokers.clear();

            for (BrokerEndPoint replica : returnMetaData.replicas()) {

                m_replicaBrokers.add(replica.host());

            }

        }

        return returnMetaData;

    }

}

标签:offset,kafka,topic,API,new,import,低级,consumer,Kafka
来源: https://blog.csdn.net/u011250186/article/details/115674130