编程语言
首页 > 编程语言> > python – 实现luigi动态图配置

python – 实现luigi动态图配置

作者:互联网

我是luigi的新手,在为我们的ML工作设计管道时遇到过它.虽然它不适合我的特定用例,但它有很多额外的功能,我决定让它适合.

基本上我正在寻找的是一种能够持久保存自定义构建管道并因此使其结果可重复且易于部署的方法,在阅读了大多数在线教程后,我尝试使用现有的luigi.cfg配置实现我的序列化.命令行机制,它可能已经足够的任务的参数,但它没有提供序列化我的管道的DAG连接,所以我决定有一个WrapperTask接收一个json配置文件,然后创建所有任务实例和连接luigi任务的所有输入输出通道(做所有管道).

我特此附上一个小测试程序供您审查:

import random
import luigi
import time
import os


class TaskNode(luigi.Task):
    i = luigi.IntParameter()  # node ID

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.required = []

    def set_required(self, required=None):
        self.required = required  # set the dependencies
        return self

    def requires(self):
        return self.required

    def output(self):
        return luigi.LocalTarget('{0}{1}.txt'.format(self.__class__.__name__, self.i))

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('inside {0}{1}\n'.format(self.__class__.__name__, self.i))
        self.process()

    def process(self):
        raise NotImplementedError(self.__class__.__name__ + " must implement this method")


class FastNode(TaskNode):

    def process(self):
        time.sleep(1)


class SlowNode(TaskNode):

    def process(self):
        time.sleep(2)


# This WrapperTask builds all the nodes 
class All(luigi.WrapperTask):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        num_nodes = 513

        classes = TaskNode.__subclasses__()
        self.nodes = []
        for i in reversed(range(num_nodes)):
            cls = random.choice(classes)

            dependencies = random.sample(self.nodes, (num_nodes - i) // 35)

            obj = cls(i=i)
            if dependencies:
                obj.set_required(required=dependencies)
            else:
                obj.set_required(required=None)

            # delete existing output causing a build all
            if obj.output().exists():
                obj.output().remove()  

            self.nodes.append(obj)

    def requires(self):
        return self.nodes


if __name__ == '__main__':
    luigi.run()

因此,基本上,如问题标题中所述,这侧重于动态依赖关系并生成具有p = 1/35连通概率的513节点依赖关系DAG,它还将All(如make all)类定义为WrapperTask,需要构建所有节点才能将其视为已完成(我的版本仅将其连接到连接的DAG组件的头部,但我不想过于复杂).

是否有更标准(Luigic)的实现方式?特别注意与TaskNode init和set_required方法并不那么复杂,我只是这样做,因为在init方法中接收参数会以某种方式与luigi注册参数的方式发生冲突.我还尝试了其他几种方法,但这基本上是最体面的(有效)

如果没有一种标准的方式,我仍然希望在完成框架实施之前,能够听到我计划进行的任何见解.

解决方法:

我昨天用answered a similar question进行了演示.我几乎完全基于example in the docs..在文档中,通过yeilding任务分配动态依赖关系似乎是他们喜欢的方式.

luigi.Config和动态依赖关系可能会为您提供几乎无限灵活性的管道.他们还描述了一个虚拟任务,它调用多个依赖链here,,它可以为您提供更多控制.

标签:python,python-3-x,data-pipeline,luigi
来源: https://codeday.me/bug/20190910/1801490.html