编程语言
首页 > 编程语言> > java – 无法在IDE中删除Kafka Stream Application的状态目录

java – 无法在IDE中删除Kafka Stream Application的状态目录

作者:互联网

我正在开发一个简单的Kafka Stream应用程序,它从主题中提取消息并在转换后将其放入另一个主题.我正在使用Intelij进行开发.

当我调试/运行这个应用程序时,如果我的IDE和Kafka服务器位于SAME机器中,它将完美运行

(i.e. with the BOOTSTRAP_SERVERS_CONFIG = localhost:9092 and
SCHEMA_REGISTRY_URL_CONFIG = localhost:8081)

但是,当我尝试使用另一台机器进行开发时

(i.e. with the BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092 and
SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081 where XXX.XXX.XXX is the
ip address of my Kafka),

调试过程第一次运行没有问题.但是,当我重置偏移后第二次运行时,我收到以下错误:

ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) 
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:

如果我将my_application_id更改为my_application_id2并运行它,它会在第一次再次运行但如果我再次运行它会再次收到错误.

我的申请中的最后一句话中有以下代码:

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

有什么建议如何解决这个问题?

更新:

我已经查看了在我的开发机器(Windows平台)中创建的状态目录,如果我在第二次运行之前手动删除这些目录,则没有发现错误.我试图以管理员身份运行我的IDE,因为我认为这可能是关于该文件夹的权限.但是,这没有用.

完整堆栈供参考:

INFO Kafka version : 1.1.0 (org.apache.kafka.common.utils.AppInfoParser:109)
INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser:110)
INFO stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. (org.apache.kafka.streams.processor.internals.StateDirectory:281)
Disconnected from the target VM, address: ‘127.0.0.1:16552’, transport: ‘socket’
Exception in thread “main” org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0
at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231)
at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931)
at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60)
at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45)
Caused by: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0
at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
at java.nio.file.Files.delete(Files.java:1126)
at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651)
at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634)
at java.nio.file.Files.walkFileTree(Files.java:2688)
at java.nio.file.Files.walkFileTree(Files.java:2742)
at org.apache.kafka.common.utils.Utils.delete(Utils.java:634)
ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297)
at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287)
java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0
at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228)
at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
… 3 more
at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
at java.nio.file.Files.delete(Files.java:1126)
at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651)
at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634)
at java.nio.file.Files.walkFileTree(Files.java:2688)
at java.nio.file.Files.walkFileTree(Files.java:2742)
at org.apache.kafka.common.utils.Utils.delete(Utils.java:634)
at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287)
at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228)
at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931)
at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60)
at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45)

更新2:
经过另一次详细检查,下面的行抛出IOException

Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {

此行位于kafka-clients-1.1.0.jar org.apache.kafka.common.utilsUtils.class

可能这是Windows系统的问题(对不起,我不是一个经验丰富的JAVA程序员).

解决方法:

对于谷歌..

我目前正在使用此Scala代码来帮助Windows人员处理状态存储的删除.

if (System.getProperty("os.name").toLowerCase.contains("windows")) {
  logger.info("WINDOWS OS MODE - Cleanup state store.")
  try {
    FileUtils.deleteDirectory(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
    FileUtils.forceMkdir(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
  } catch {
    case e: Exception => logger.error(e.toString)
  }
}
else {
  streams.cleanUp()
}

标签:apache-kafka-streams,java,apache-kafka,windows
来源: https://codeday.me/bug/20191008/1871010.html