编程语言
首页 > 编程语言> > Python Luigi – 满意时继续执行外部任务

Python Luigi – 满意时继续执行外部任务

作者:互联网

我正在研究一个Luigi管道,它检查是否存在手动创建的文件,如果存在,继续执行下一个任务:

import luigi, os

class ExternalFileChecker(luigi.ExternalTask):
    task_namespace='MyTask'
    path = luigi.Parameter()
    def output(self):
        return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))

 class ProcessExternalFile(luigi.Task):
      task_namespace='MyTask'
      path = luigi.Parameter()

      def requires(self):
          return ExternalFileChecker(path=self.path)

      def output(self):
          dirname = self.path
          outfile = os.path.join(dirname, 'processedfile.txt')
          return luigi.LocalTarget(outfile)

      def run(self):
          #do processing

if __name__ == '__main__':
      path = r'D:\MyPath\luigi'
      luigi.run(['MyTask.ProcessExternalFile','--path', path,\
      '--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
      '--worker-keep-alive'])

我想要的是,在创建手册文件并将其粘贴到路径中后,luigi继续.当我这样做时,它不是找到文件并继续执行任务,而是每隔几秒重新检查一个新任务:

DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 1.536391 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 5.669132 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
(...)

经过相当长的时间(15-20分钟左右),luigi将找到该文件,然后它可以按照需要继续.我该怎么做才能防止这种延迟?我希望luigi在文件存在后立即继续.

解决方法:

要记住以下几点:

> Luigi工作线程将不会退出,直到至少有一个任务在运行(或者如果keep_alive = True,在这种情况下,当没有更多待处理任务时它将退出).
>失败的任务有重试逻辑,默认重试间隔为15分钟.
>重试逻辑的工作原理如下.在指定的重试间隔后,调度程序将忘记任务的失败(与单击UI中的“原谅失败”按钮相同),并将任务的状态更改为挂起.下一次工作人员要求调度程序工作时,可以将此任务分配给工作人员.
>不完整的外部任务计为FAILED,受重试逻辑限制.
>外部任务的重试行为由[worker]部分中的retry_external_tasks配置设置控制.

我认为你所观察到的是这样的.您的管道正在运行,任务ProcessExternalFile失败,然后您添加文件,任务在retry_delay期间保持FAILED,然后最终它变为PENDING并且工作人员再次被赋予此任务,此时它发现文件和任务变得完整.

这是否是您期望的行为取决于您.如果希望更快地找到该文件,则可以更改重试间隔.或者你可以在run方法中进行无限循环,并定期检查文件,并在找到时跳出循环.您还可以将Luigi配置为完全禁用重试逻辑.

标签:python,luigi
来源: https://codeday.me/bug/20190628/1310972.html