编程语言
首页 > 编程语言> > java-如何将Apache Kafka与Amazon S3连接?

java-如何将Apache Kafka与Amazon S3连接?

作者:互联网

我想使用Kafka Connect将数据从Kafka存储到存储桶s3中.我已经在运行一个Kafka的主题,并且创建了一个s3存储桶.我的主题包含有关Protobuffer的数据,我尝试使用https://github.com/qubole/streamx并获得了下一个错误:

 [2018-10-04 13:35:46,512] INFO Revoking previously assigned partitions [] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
 [2018-10-04 13:35:46,512] INFO (Re-)joining group connect-s3-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
 [2018-10-04 13:35:46,645] INFO Successfully joined group connect-s3-sink with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
 [2018-10-04 13:35:46,692] INFO Setting newly assigned partitions [ssp.impressions-11, ssp.impressions-10, ssp.impressions-7, ssp.impressions-6, ssp.impressions-9, ssp.impressions-8, ssp.impressions-3, ssp.impressions-2, ssp.impressions-5, ssp.impressions-4, ssp.impressions-1, ssp.impressions-0] for Group connect-s3-sink(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
 [2018-10-04 13:35:47,193] ERROR Task s3-sink-0 threw an uncaught an unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
 java.lang.NullPointerException
    at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:290)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2018-10-04 13:35:47,194] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2018-10-04 13:35:51,235] INFO Reflections took 6844 ms to scan 259 urls, producing 13517 keys and 95788 values (org.reflections.Reflections:229)

我做了以下步骤:

>我克隆了存储库.
> mvn DskipTests包
> nano config / connect-standalone.properties

bootstrap.servers=ip-myip.ec2.internal:9092
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter

> nano config / quickstart-s3.properties

name=s3-sink 
connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=com.qubole.streamx.SourceFormat tasks.max=1
topics=ssp.impressions
flush.size=3
s3.url=s3://myaccess_key:mysecret_key@mybucket/demo

> connect-standalone /etc/kafka/connect-standalone.properties quickstart-s3.properties

我想知道我是否还可以,还是将数据从Kafka保留到S3的另一种方法.

解决方法:

您可以通过Kafka Connect S3连接器使用Kafka Connect进行此集成.

Kafka Connect是Apache Kafka的一部分,而S3 connector是一个开放源代码连接器,可以从standalone或作为Confluent Platform的一部分获得.

有关Kafka Connect的一般信息和示例,此系列文章可能会有所帮助:

> https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/
> https://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/
> https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/

免责声明:我为Confluent工作,并写了以上博客文章.

标签:apache-kafka-connect,amazon-s3,apache-kafka,java
来源: https://codeday.me/bug/20191025/1924730.html