其他分享
首页 > 其他分享> > 如何使用Luigi连续更新目标文件?

如何使用Luigi连续更新目标文件?

作者:互联网

我最近开始玩Luigi,我想知道如何使用它来不断地将新数据附加到现有的目标文件中.

想象一下,我每分钟都在ping一个api来检索新数据.因为任务仅在目标尚不存在时运行,所以一种天真的方法是通过当前日期时间参数化输出文件.这是一个简单的例子:

import luigi
import datetime

class data_download(luigi.Task):
    date = luigi.DateParameter(default = datetime.datetime.now()) 

    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget("data_test_%s.json" % self.date.strftime("%Y-%m-%d_%H:%M"))

    def run(self):
        data = download_data()
        with self.output().open('w') as out_file:
            out_file.write(data + '\n')

if __name__ == '__main__':
    luigi.run()

如果我安排此任务每分钟运行一次,它将执行,因为当前时间的目标文件尚不存在.但它每分钟创建60个文件.我想要做的是确保所有新数据最终都在同一个文件中.实现这一目标的可扩展方法是什么?欢迎任何想法,建议!

解决方法:

你不能.正如LocalTarget的doc所说:

Parameters: mode (str) – the mode r opens the FileSystemTarget in read-only mode, whereas w will open the FileSystemTarget in write mode. Subclasses can implement additional options.

即只允许使用r或w模式.其他选项,例如要求扩展LocalTarget类;尽管它打破了Luigi任务执行所需的幂等性.

标签:python,luigi
来源: https://codeday.me/bug/20190705/1391964.html