其他分享
首页 > 其他分享> > FATE详细介绍

FATE详细介绍

作者:互联网

FATE简介

FATE(Federated AI Technology Enabler)是微众银行AI部门发起的一个开源项目,旨在提供一个安全的计算框架来支持联邦AI生态系统。它实现了基于同态加密和多方计算(MPC)的安全计算协议。它支持联邦学习架构和各种机器学习算法的安全计算,包括逻辑回归、基于树的算法、深度学习和迁移学习。

FATE技术框架

FederatedML

​ 算法功能组件,包括常见机器学习算法联邦化实现。所有模块均采用模块化的解耦的方式进行开发,从而增强可扩展性。

FATE_Flow

​ FATE-Flow是联邦学习框架FATE的作业调度系统,实现联邦学习作业生命周期的完整管理,其中包括数据输入、训练作业调度、指标追踪、模型中心等功能。

FATE-Board

​ 联邦学习建模的可视化工具,为终端用户可视化和度量模型训练的全过程。支持对模型训练过程全流程的跟踪、统计和监控等,并为模型运行状态、模型输出、日志追踪等提供了丰富的可视化呈现,帮助用户简单而高效地深入探索模型与理解模型。

FATE-Serving

​ 高性能可扩展的联邦学习在线模型服务。

角色

Guest

​ Guest表示数据应用方,在纵向算法中,Guest往往是有标签y的一方。一般是由Guest发起建模流程。

Host

​ Host是数据提供方。

arbiter

​ arbiter是用来辅助多方完成联合建模的,主要的作用是用来聚合梯度或者模型,比如纵向lr里面,各方将自己一半的梯度发送给arbiter,然后arbiter再联合优化等等,arbiter还参与以及分发公私钥,进行加解密服务等等。

FATE环境部署指南

单机部署

参考:https://fate.readthedocs.io/en/latest/_build_temp/standalone-deploy/README.html#install-fate-using-docker-recommended,包括使用 Docker 安装 FATE*(推荐)*在主机中安装 FATE

集群部署

参考:https://fate.readthedocs.io/en/latest/_build_temp/cluster-deploy/README.html

主机进入到docker FATE镜像命令

主机ip:192.168.1.75

FATE版本为1.6.0,主机执行以下命令

CONTAINER_ID=`docker ps -aqf "name=fate"`
docker exec -t -i ${CONTAINER_ID} bash

快速启动 FATE 的一般指南

参考:https://fate.readthedocs.io/en/latest/_build_temp/examples/pipeline/README.html

由于FATE已经启动,以下命令不需要执行

  1. (可选)创建虚拟环境

    python -m venv venv
    source venv/bin/activate
    pip install -U pip
    
  2. 安装FATE客户端

    pip install fate_client
    pipeline init --help
    
  3. 提供部署的 FATE-Flow 的服务器 ip/port 信息

    # fate-flow server初始化 pipeline的ip和端口. 默认 ip:port 是127.0.0.1:8080.
    pipeline init --ip 127.0.0.1 --port 9380
    # 可选,设置Pipeline目录
    pipeline init --ip 127.0.0.1 --port 9380 --log-directory {desired log path}
    
  4. 使用 FATE-Pipeline 上传数据

    在开始建模任务之前,应上传要使用的数据。通常,一方通常是包含多个节点的集群。因此,当我们上传这些数据时,数据将分配给这些节点。

    参考此文档测试用例1内容。

FATE使用预备知识

上传数据指南

参考:https://fate.readthedocs.io/en/latest/_build_temp/doc/upload_data_guide_zh.html

接受的数据类型

Dense、svm-light、tag、tag:value

定义上传数据配置文件

{
  "file": "examples/data/breast_hetero_guest.csv",
  "table_name": "hetero_breast_guest",
  "namespace": "experiment",
  "head": 1,
  "partition": 8,
  "work_mode": 0,
  "backend": 0
}

字段说明:

  1. file: 文件路径

  2. table_name&namespace: 存储数据表的标识符号

  3. head: 指定数据文件是否包含表头

  4. partition: 指定用于存储数据的分区数

  5. work_mode: 指定工作模式,0代表单机版,1代表集群版

  6. backend: 指定后端,0代表EGGROLL, 1代表SPARK加RabbitMQ, 2代表SPARK加Pulsar

上传命令

每个提供数据的集群(即guest和host)都需执行此步骤

1.使用fate-flow上传数据:

flow data upload -c dsl_test/upload_data.json

upload_data.json内容:

{
    "file": "examples/data/breast_hetero_guest.csv",
    "head": 1,
    "partition": 16,
    "work_mode": 0,
    "table_name": "breast_hetero_guest",
    "namespace": "experiment"
}

2.使用旧版python脚本上传数据:

python /fate/python/fate_flow/fate_flow_client.py -f upload -c /fate/examples/dsl/v1/upload_data.json

3.使用python脚本上传数据:参考测试用例1

任务配置与运行配置(DSL & Task Submit Runtime Conf Setting)V2

​ 为了让任务模型的构建更加灵活,目前 FATE 使用了一套自定的领域特定语言 (DSL) 来描述任务。在 DSL 中,各种模块(例如数据读写 data_io,特征工程 feature-engineering, 回归 regression,分类 classification)可以通向一个有向无环图 (DAG) 组织起来。通过各种方式,用户可以根据自身的需要,灵活地组合各种算法模块。

​ 除此之外,每个模块都有不同的参数需要配置,不同的 party 对于同一个模块的参数也可能有所区别。为了简化这种情况,对于每一个模块,FATE 会将所有 party 的不同参数保存到同一个运行配置文件(Submit Runtime Conf)中,并且所有的 party 都将共用这个配置文件。这个指南将会告诉你如何创建一个 DSL 配置文件。

V2配置参考:https://fate.readthedocs.io/en/latest/_build_temp/doc/dsl_conf_v2_setting_guide_zh.html

DSL配置说明

1.概述

​ DSL 的配置文件采用 json 格式,实际上,整个配置文件就是一个 json 对象 (dict)。

2.Components

​ 在这个 dict 的第一级是 “components”,用来表示这个任务将会使用到的各个模块,每个独立的模块定义在 “components” 之下,所有数据需要通过Reader模块从数据存储拿取数据,注意Reader模块仅有输出output

3.module

​ 用来指定使用的模块。模块名参考FATE ML算法列表(https://fate.readthedocs.io/en/latest/_build_temp/python/federatedml/README_zh.html),与/fate/python/federatedml/conf/setting_conf 下各个模块的文件名保持一致(不包括 .json 后缀)。

4.input

​ 分为两种输入类型,分别是 Data和 Model。

Data输入

分为三种输入类型:

  1. data: 一般被用于 data_io 模块, feature_engineering 模块或者 evaluation 模块
  2. train_data: 一般被用于 homo_lr, hetero_lr 和 secure_boost 模块。如果出现了 train_data 字段,那么这个任务将会被识别为一个 fit 任务
  3. validate_data: 如果存在 train_data 字段,那么该字段是可选的。如果选择保留该字段,则指向的数据将会作为 validation set
  4. test_data: 用作预测数据,如提供,需同时提供model输入。
Model输入

分为两种输入类型

​ 1.model: 用于同种类型组件的模型输入。

​ 2.isometric_model: 用于指定继承上游组件的模型输入

5.output
数据输出

分为四种输出类型:

  1. data: 常规模块数据输出
  2. train_data: 仅用于Data Split
  3. validate_data: 仅用于Data Split
  4. test_data: 仅用于Data Split
模型输出

仅使用model

DSL配置示例

训练模式,用户可以使用其他算法模块替代HeteroSecureBoost,注意模块名hetero_secureboost_0也要一起更改

 {
    "components": {
        "reader_0": {
            "module": "Reader",
            "output": {
                "data": [
                    "data"
                ]
            }
        },
        "dataio_0": {
            "module": "DataIO",
            "input": {
                "data": {
                    "data": [
                        "reader_0.data"
                    ]
                }
            },
            "output": {
                "data": [
                    "data"
                ],
                "model": [
                    "model"
                ]
            }
        },
        "intersection_0": {
            "module": "Intersection",
            "input": {
                "data": {
                    "data": [
                        "dataio_0.data"
                    ]
                }
            },
            "output": {
                "data": [
                    "data"
                ]
            }
        },
        "hetero_secureboost_0": {
            "module": "HeteroSecureBoost",
            "input": {
                "data": {
                    "train_data": [
                        "intersection_0.data"
                    ]
                }
            },
            "output": {
                "data": [
                    "data"
                ],
                "model": [
                    "model"
                ]
            }
        },
        "evaluation_0": {
            "module": "Evaluation",
            "input": {
                "data": {
                    "data": [
                        "hetero_secureboost_0.data"
                    ]
                }
            },
            "output": {
                "data": [
                    "data"
                ]
            }
        }
    }
}

创建配置文件Submit Runtime Conf

针对1.5.x版本新格式,Job Runtime Conf用于设置各个参与方的信息, 作业的参数及各个组件的参数。 内容包括如下

1. DSL版本

配置版本,默认不配置为1,建议配置为2

"dsl_version": "2"
2. Job Participants(作业参与方)

用户需要定义 initiator。

1.发起方,包括:任务发起方的role和party_id,例如:

"initiator": {
    "role": "guest",
    "party_id": 9999
}

2.所有参与方:包含各参与方信息, 在 role 字段中,每一个元素代表一种角色以及承担这个角色的 party_id。每个角色的 party_id 以列表形式存在,因为一个任务可能涉及到多个 party 担任同一种角色。例如:

"role": {
    "guest": [9999],
    "host": [10000],
    "arbiter": [10000]
}
3.algorithm_parameters(系统运行参数)

配置作业运行时的主要系统参数

参数应用范围策略设置

其中common下的参数应用于所有参与方,role-guest-0配置下的参数应用于guest角色0号下标的参与方 .注意,当前版本系统运行参数未对仅应用于某参与方做严格测试,建议优先选用common

支持的系统参数
配置项默认值支持值说明
job_typetraintrain, predict任务类型
work_mode00, 10代表单方单机版,1代表多方分布式版本
backend00, 1, 20代表EGGROLL,1代表SPARK加RabbitMQ,2代表SPARK加Pulsar
model_id--模型id,预测任务需要填入
model_version--模型version,预测任务需要填入
task_cores4正整数作业申请的总cpu核数
task_parallelism1正整数task并行度
computing_partitionstask所分配到的cpu核数正整数计算时数据表的分区数
eggroll_runprocessors_per_node等eggroll计算引擎相关配置参数,一般无须配置,由task_cores自动计算得到,若配置则task_cores参数不生效
spark_runnum-executors, executor-cores等spark计算引擎相关配置参数,一般无须配置,由task_cores自动计算得到,若配置则task_cores参数不生效
rabbitmq_runqueue, exchange等rabbitmq创建queue、exchange的相关配置参数,一般无须配置,采取系统默认值
pulsar_runproducer, consumer等pulsar创建producer和consumer时候的相关配置,一般无需配置。
federated_status_collect_typePUSHPUSH, PULL多方运行状态收集模式,PUSH表示每个参与方主动上报到发起方,PULL表示发起方定期向各个参与方拉取
timeout259200 (3天)正整数任务超时时间,单位秒
  1. 三大类引擎具有一定的支持依赖关系,例如Spark计算引擎当前仅支持使用HDFS作为中间数据存储引擎
  2. work_mode + backend会自动依据支持依赖关系,产生对应的三大引擎配置computing、storage、federation
  3. 开发者可自行实现适配的引擎,并在runtime config配置引擎
未开放的参数

参考:https://fate.readthedocs.io/en/latest/_build_temp/doc/dsl_conf_v2_setting_guide_zh.html#id17

参考配置

共有四种

  1. 使用eggroll作为backend,采取默认cpu分配计算策略时的配置

  2. 使用eggroll作为backend,采取直接指定cpu等参数时的配置

  3. 使用spark加rabbitMQ作为backend,采取直接指定cpu等参数时的配置

    "job_parameters": {
      "common": {
        "job_type": "train",
        "work_mode": 1,
        "backend": 1,
        "spark_run": {
          "num-executors": 1,
          "executor-cores": 2
        },
        "task_parallelism": 2,
        "computing_partitions": 8,
        "timeout": 36000,
        "rabbitmq_run": {
          "queue": {
            "durable": true
          },
          "connection": {
            "heartbeat": 10000
          }
        }
      }
    }
    
  4. 使用spark加pulsar作为backend

详情,参考https://fate.readthedocs.io/en/latest/_build_temp/doc/dsl_conf_v2_setting_guide_zh.html

资源管理详细说明

​ 1.5.0版本开始,为了进一步管理资源,fateflow启用更细粒度的cpu cores管理策略,去除早前版本直接通过限制同时运行作业个数的策略。

包括:总资源配置、运行资源计算、资源调度,详情参考:https://fate.readthedocs.io/en/latest/_build_temp/doc/dsl_conf_v2_setting_guide_zh.html#id19

4. 组件运行参数

详情参考:https://fate.readthedocs.io/en/latest/_build_temp/doc/dsl_conf_v2_setting_guide_zh.html#id23

参数应用范围策略设置
"commom": {
}

"role": {
  "guest": {
    "0": {
    }
  }
}

其中common配置下的参数应用于所有参与方,role-guest-0配置下的参数表示应用于guest角色0号下标的参与方 注意,当前版本组件运行参数已支持两种应用范围策略

参考配置

上述组件名称是在DSL配置文件中定义

"component_parameters": {
  "common": {
    "intersection_0": {
      "intersect_method": "raw",
      "sync_intersect_ids": true,
      "only_output_key": false
    },
    "hetero_lr_0": {
      "penalty": "L2",
      "optimizer": "rmsprop",
      "alpha": 0.01,
      "max_iter": 3,
      "batch_size": 320,
      "learning_rate": 0.15,
      "init_param": {
        "init_method": "random_uniform"
      }
    }
  },
  "role": {
    "guest": {
      "0": {
        "reader_0": {
          "table": {"name": "breast_hetero_guest", "namespace": "experiment"}
        },
        "dataio_0":{
          "with_label": true,
          "label_name": "y",
          "label_type": "int",
          "output_format": "dense"
        }
      }
    },
    "host": {
      "0": {
        "reader_0": {
          "table": {"name": "breast_hetero_host", "namespace": "experiment"}
        },
        "dataio_0":{
          "with_label": false,
          "output_format": "dense"
        }
      }
    }
  }
}
多Host 配置

包括多Host任务应在role下列举所有host信息、各host不同的配置应在各自对应模块下分别列举,详情参考:https://fate.readthedocs.io/en/latest/_build_temp/doc/dsl_conf_v2_setting_guide_zh.html#host

预测任务配置

DSL V2不会自动为训练任务生成预测dsl。 用户需要首先使用Flow Client部署所需模型中模块。 详细命令说明请参考FATE-Flow document </fate/python/fate_client/flow_client/README_zh.rst 的deploy模块>

命令如下:

flow model deploy 
--model-id $model_id #模型ID 必填
--model-version $model_version #模型版本 必填
# --cpn-list ... 组件列表,非必要参数
5.FATE-FLOW运行job基本原理
  1. 提交作业后,fateflow获取job dsl与job config,存于数据库t_job表对应字段以及/fate/jobs/$jobid/目录,
  2. 解析job dsl与job config,依据合并参数生成细粒度参数(如上述所说的backend&work_mode对应会生成三个引擎参数), 以及处理参数默认值
  3. 将共同配置分发到各参与方并存储,依据参与方的实际信息,生成job_runtime_on_party_conf
  4. 每个参与方接收到任务时,均依据job_runtime_on_party_conf执行

$jobid目录包括文件:

job_dsl.json  job_runtime_conf.json  local  pipeline_dsl.json  train_runtime_conf.json

Submit Runtime Conf配置示例

训练、验证模型 dsl样例

{
    "dsl_version": 2,
    "initiator": {
        "role": "guest",
        "party_id": 9999
    },
    "role": {
        "host": [
            10000
        ],
        "guest": [
            9999
        ]
    },
    "job_parameters": {
        "job_type": "train",
        "work_mode": 0,
        "backend": 0,
        "computing_engine": "STANDALONE",
        "federation_engine": "STANDALONE",
        "storage_engine": "STANDALONE",
        "engines_address": {
            "computing": {
                "nodes": 1,
                "cores_per_node": 20
            },
            "federation": {
                "nodes": 1,
                "cores_per_node": 20
            },
            "storage": {
                "nodes": 1,
                "cores_per_node": 20
            }
        },
        "federated_mode": "SINGLE",
        "task_parallelism": 1,
        "computing_partitions": 4,
        "federated_status_collect_type": "PULL",
        "model_id": "guest-9999#host-10000#model",
        "model_version": "202108310831349550536",
        "eggroll_run": {
            "eggroll.session.processors.per.node": 4
        },
        "spark_run": {},
        "rabbitmq_run": {},
        "pulsar_run": {},
        "adaptation_parameters": {
            "task_nodes": 1,
            "task_cores_per_node": 4,
            "task_memory_per_node": 0,
            "request_task_cores": 4,
            "if_initiator_baseline": false
        }
    },
    "component_parameters": {
        "role": {
            "guest": {
                "0": {
                    "reader_0": {
                        "table": {
                            "name": "breast_hetero_guest",
                            "namespace": "experiment"
                        }
                    }
                }
            },
            "host": {
                "0": {
                    "reader_0": {
                        "table": {
                            "name": "breast_hetero_host",
                            "namespace": "experiment"
                        }
                    },
                    "dataio_0": {
                        "with_label": false
                    }
                }
            }
        },
        "common": {
            "dataio_0": {
                "with_label": true
            },
            "hetero_secureboost_0": {
                "task_type": "classification",
                "objective_param": {
                    "objective": "cross_entropy"
                },
                "num_trees": 5,
                "bin_num": 16,
                "encrypt_param": {
                    "method": "iterativeAffine"
                },
                "tree_param": {
                    "max_depth": 3
                }
            },
            "evaluation_0": {
                "eval_type": "binary"
            }
        }
    }
}

算法

FATE ML

FATE目前支持三种类型联邦学习算法:横向联邦学习、纵向联邦学习以及迁移学习。

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/federatedml/README_zh.html

FederatedML 在联邦学习上实现许多常见的机器学习算法。所有模块均采用解耦模块化方法开发,以增强可扩展性。FederatedML 提供:

  1. 联合统计:PSI、Union(并集计算)、Pearson Correlation(皮尔逊系数) 等。
  2. 联合特征工程:包括联邦采样,联邦特征分箱,联邦特征选择等。
  3. 联合机器学习算法:包括横向和纵向的联邦LR, GBDT, DNN,迁移学习等。
  4. 模型评估:提供对二分类,多分类,回归评估,聚类评估,联邦和单边对比评估。
  5. 安全协议:提供了多种安全协议,以进行更安全的多方交互计算。

联邦机器学习框架:参考https://fate.readthedocs.io/en/latest/_build_temp/python/federatedml/README_zh.html#id4

算法清单

测试用例会用到的算法列表:

算法模块名描述数据输入数据输出模型输入模型输出
ReaderReader当输入数据的存储引擎当前计算引擎不支持时,会自动转存到FATE集群适配计算引擎的组件输出存储引擎;当输入数据的存储格式非FATE支持存储格式时,会自动转换格式,并存储到FATE集群的组件输出存储引擎用户原始存储数据转换后原始数据
DataIODataIO该组件将原始数据转换为Instance对象(FATE-v1.7后会逐步弃用,使用DataTransform)。Table,值为原始数据转换后的数据表,值为在 : federatedml/feature/instance.py 中定义的Data Instance的实例DataIO模型
DataTransform_DataTransform该组件将原始数据转换为Instance对象。Table,值为原始数据转换后的数据表,值为在 : federatedml/feature/instance.py 中定义的Data Instance的实例DataTransform模型
IntersectIntersection计算两方的相交数据集,而不会泄漏任何差异数据集的信息。主要用于纵向任务。Table两方Table中相交的部分Intersect模型
Hetero Secure BoostingHeteroSecureBoost通过多方构建纵向Secure Boost模块。Table,值为InstanceSecureBoost模型,由模型本身和模型参数组成

详细算法清单见:https://fate.readthedocs.io/en/latest/_build_temp/python/federatedml/README_zh.html#id2

安全协议

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/federatedml/README_zh.html#id3

参数

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/federatedml/README_zh.html#module-federatedml.param

示例

详情见:https://fate.readthedocs.io/en/latest/_build_temp/examples/README.html

示例使用指南

即本文档的测试用例2,3

FATE-FLOW

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_flow/README_zh.html

FATE-Flow是联邦学习框架FATE的作业调度系统,实现联邦学习作业生命周期的完整管理,其中包括数据输入、训练作业调度、指标追踪、模型中心等功能.

FATE-Flow关键点

架构

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_flow/README_zh.html#id3

部署

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_flow/README_zh.html#id4

用法

FATE-Flow Client命令行

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_client/flow_client/README_zh.html

在新版的FATE Flow命令行控制台中,将命令拆分成了多个类,包括 job, data, model, component 等等。所有的命令将有一个共有调用入口

[IN]
flow

[OUT]
Usage: flow [OPTIONS] COMMAND [ARGS]...

  Fate Flow Client

Options:
  -h, --help  Show this message and exit.

Commands:
  component   Component Operations
  data        Data Operations
  job         Job Operations
  model       Model Operations
  queue       Queue Operations
  table       Table Operations
  task        Task Operations

FATE-Flow 命令行接口参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_flow/doc/fate_flow_cli.html

python fate_flow_client.py -f $command
# 
#cd /fate/python/fate_flow/
#python fate_flow_client.py -f submit_job -c /fate/python/fate_flow/examples/test_hetero_lr_job_conf.json -d /fate/python/fate_flow/examples/test_hetero_lr_job_dsl.json

FATE-Flow Client SDK指南

详情参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_client/flow_sdk/README_zh.html

包含

用法类似:

from flow_sdk.client import FlowClient
# use real ip address to initialize SDK
client = FlowClient('127.0.0.1', 9000, 'v1')
 
#client.job.submit(conf_path, dsl_path)

FATE-Flow REST API

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_flow/doc/fate_flow_http_api.html

FATE模型发布及在线联邦推理指南

参考:https://fate.readthedocs.io/en/latest/_build_temp/doc/model_publish_with_serving_guide_zh.html

FATE Pipeline使用

Pipeline介绍参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_client/pipeline/README.html

​ Pipeline 是一个高级 Python API,允许用户以顺序方式设计、启动和查询 FATE 作业。FATE Pipeline 设计为用户友好且与 FATE 命令行工具的行为一致。用户可以通过向管道添加组件来自定义作业工作流,然后通过一次调用启动作业。此外,Pipeline 提供了在拟合管道后运行预测和查询信息的功能。

一个FATE Job 是一个有向无环图

​ FATE 作业是由算法组件节点组成的 dag。FATE 管道提供易于使用的工具来配置任务的顺序和设置。

​ FATE 以模块化风格编写。模块被设计为具有输入和输出数据和模型。因此,当一个模块的输出设置为另一个模块的输入时,两个模块连接在一起。通过跟踪一个数据集是如何通过 FATE 模块处理的,可以看到一个 FATE 作业实际上是由一系列子任务组成的。

pipeline接口

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_client/pipeline/README.html#interface-of-pipeline

Component

FATE 模块被封装component在 Pipeline API 中。每个组件都可以接收和输出DataModel。组件的参数可以在初始化时方便的设置。未指定的参数将采用默认值。所有组件都有一个 name,可以任意设置。组件的名称是它的标识符,因此它在Pipeline 必须是唯一的。我们建议每个组件名称都包含一个编号作为后缀,以便于跟踪。

每个组件都可能具有输入和/或输出Data和/或Model。有关如何使用组件的详细信息,请参阅本 指南

使用指定参数值初始化组件的示例:

hetero_lr_0 = HeteroLR(name="hetero_lr_0", early_stop="weight_diff", max_iter=10,
                       early_stopping_rounds=2, validation_freqs=2)
Input

Input封装了一个组件的所有输入,包括 DataModel输入。要访问input组件,请引用其input属性:

input_all = dataio_0.input
Output

输出封装的部件,其中包括的所有输出结果 Data,并Model输出。要从Output组件访问,请引用其output属性:

output_all = dataio_0.output
Data

Data包装组件的所有数据类型输入和输出。FATE Pipeline 包括五种类型data,每种类型用于不同的场景。有关更多信息,请参阅此处

Model

Model定义组件的模型输入和输出。与 类似Data,这两种类型models用于不同的目的。有关更多信息,请参阅此处

构建pipeline

初始化管道后,应指定作业参与者和发起者。以下是管道初始设置的示例:

pipeline = PipeLine()
pipeline.set_initiator(role='guest', party_id=9999)
pipeline.set_roles(guest=9999, host=10000, arbiter=10000)

Reader需要读入数据源,以便其他组件可以处理数据。定义一个Reader组件:

reader_0 = Reader(name="reader_0")

在大多数情况下,DataIO如下Reader将数据转换为DataInstance格式,然后可以用于数据工程和模型训练。某些组件(例如UnionIntersection)可以直接在非 DataInstance 表上运行。

可以通过设置为不同的角色单独配置所有管道组件get_party_instance。例如,DataIO 可以像这样专门为guest配置组件:

dataio_0 = DataIO(name="dataio_0")
guest_component_instance = dataio_0.get_party_instance(role='guest', party_id=9999)
guest_component_instance.component_param(with_label=True, output_format="dense")

要在管道中包含组件,请使用add_component. 要将DataIO组件添加 到先前创建的管道,请尝试以下操作:

pipeline.add_component(dataio_0, data=Data(data=reader_0.output.data))
以 Keras 风格构建 Fate NN 模型

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_client/pipeline/README.html#build-fate-nn-model-in-keras-style

初始化运行时作业参数

为了拟合或预测,用户需要初始化运行时环境,如“backend”和“work_mode”,

from pipeline.runtime.entity import JobParameters
job_parameters = JobParameters(backend=Backend.EGGROLL, work_mode=WorkMode.STANDALONE)

运行pipeline

添加所有组件后,用户需要在运行设计的作业之前首先编译管道。编译后,可以使用适当的Backend和来拟合(运行训练作业)管道WorkMode

pipeline.compile()
pipeline.fit(job_parameters)

任务查询

FATE Pipeline 提供 API 来查询组件信息,包括数据、模型和摘要。所有查询 API 都具有与FlowPy匹配的名称 ,而 Pipeline 检索查询结果并将其直接返回给用户。

summary = pipeline.get_component("hetero_lr_0").get_summary()

部署组件

一旦pipeline 拟合完成,就可以在新数据集上运行预测。在预测之前,需要首先部署必要的组件。此步骤标记要由预测管道使用的选定组件。

# deploy select components
pipeline.deploy_component([dataio_0, hetero_lr_0])
# deploy all components
# note that Reader component cannot be deployed. Always deploy pipeline with Reader by specified component list.
pipeline.deploy_component()

使用pipeline进行预测

首先,启动一个新的管道,然后指定用于预测的数据源。

predict_pipeline = PipeLine()
predict_pipeline.add_component(reader_0)
predict_pipeline.add_component(pipeline,
                               data=Data(predict_input={pipeline.dataio_0.input.data: reader_0.output.data}))

然后可以在新pipeline上启动预测。

predict_pipeline.predict(job_parameters)

此外,由于pipeline是模块化的,用户可以在运行预测之前向原始pipeline添加新组件。

predict_pipeline.add_component(evaluation_0, data=Data(data=pipeline.hetero_lr_0.output.data))
predict_pipeline.predict(job_parameters)

pipeline的保存和恢复

要保存管道,只需使用转储接口。

pipeline.dump("pipeline_saved.pkl")

要恢复管道,请使用load_model_from_file接口。

from pipeline.backend.pipeline import PineLine
PipeLine.load_model_from_file("pipeline_saved.pkl")

pipeline汇总信息

要获取管道的详细信息,请使用describe接口,它会打印“创建时间”拟合或预测状态以及构建的 dsl(如果存在)。

pipeline.describe()

管道与 CLI

在过去的版本中,用户通过命令行界面与 FATE 交互,通常使用手动配置的 conf 和 dsl json 文件。手动配置既乏味又容易出错。FATE Pipeline 在编译时自动形成任务配置文件,允许快速尝试任务设计。

日志

FATE-Flow服务日志:/fate/logs/fate_flow/

任务日志:/fate/logs/$job_id/

常见问题

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_flow/README_zh.html#id10

测试样例

FATE Board任务查看界面:http://192.168.1.75:8080/#/history

参考:https://fate.readthedocs.io/en/latest/_build_temp/examples/experiment_template/user_usage/dsl_v2_predict_tutorial.html

测试用例1-上传文件

python /fate/examples/pipeline/demo/pipeline-upload.py --base  /fate

pipeline-upload.py内容如下:

将examples/data/breast_hetero_guest.csv改为自己想上传的文件

import os
import argparse

from pipeline.backend.config import Backend, WorkMode
from pipeline.backend.pipeline import PipeLine

# path to data
# default fate installation path
DATA_BASE = "/data/projects/fate"

# site-package ver
# import site
# DATA_BASE = site.getsitepackages()[0]


def main(data_base=DATA_BASE):
    # parties config
    guest = 9999
    # 0 for eggroll, 1 for spark
    backend = Backend.EGGROLL
    # 0 for standalone, 1 for cluster
    work_mode = WorkMode.STANDALONE
    # use the work mode below for cluster deployment
    # work_mode = WorkMode.CLUSTER

    # partition for data storage
    partition = 4

    # table name and namespace, used in FATE job configuration
    dense_data = {"name": "breast_hetero_guest", "namespace": f"experiment"}
    tag_data = {"name": "breast_hetero_host", "namespace": f"experiment"}

    pipeline_upload = PipeLine().set_initiator(role="guest", party_id=guest).set_roles(guest=guest)
    # add upload data info
    # path to csv file(s) to be uploaded, modify to upload designated data
    pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_hetero_guest.csv"),
                                    table_name=dense_data["name"],             # table name
                                    namespace=dense_data["namespace"],         # namespace
                                    head=1, partition=partition)               # data info

    pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_hetero_host.csv"),
                                    table_name=tag_data["name"],
                                    namespace=tag_data["namespace"],
                                    head=1, partition=partition)

    # upload data
    pipeline_upload.upload(work_mode=work_mode, backend=backend, drop=1)


if __name__ == "__main__":
    parser = argparse.ArgumentParser("PIPELINE DEMO")
    parser.add_argument("--base", "-b", type=str,
                        help="data base, path to directory that contains examples/data")

    args = parser.parse_args()
    if args.base is not None:
        main(args.base)
    else:
        main()

测试用例2-训练和评估模型

pipeline搭建:

python /fate/examples/pipeline/demo/pipeline-quick-demo.py

pipeline-quick-demo.py内容如下:

import json
from pipeline.backend.config import Backend, WorkMode
from pipeline.backend.pipeline import PipeLine
from pipeline.component import Reader, DataIO, Intersection, HeteroSecureBoost, Evaluation
from pipeline.interface import Data
from pipeline.runtime.entity import JobParameters

# table name & namespace in data storage
# data should be uploaded before running modeling task
guest_train_data = {"name": "breast_hetero_guest", "namespace": "experiment"}
host_train_data = {"name": "breast_hetero_host", "namespace": "experiment"}

# initialize pipeline
pipeline = PipeLine().set_initiator(role="guest", party_id=9999).set_roles(guest=9999, host=10000)

# define components
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role="guest", party_id=9999).component_param(table=guest_train_data)
reader_0.get_party_instance(role="host", party_id=10000).component_param(table=host_train_data)
dataio_0 = DataIO(name="dataio_0", with_label=True)
dataio_0.get_party_instance(role="host", party_id=10000).component_param(with_label=False)
intersect_0 = Intersection(name="intersection_0")
hetero_secureboost_0 = HeteroSecureBoost(name="hetero_secureboost_0",
                                         num_trees=5,
                                         bin_num=16,
                                         task_type="classification",
                                         objective_param={"objective": "cross_entropy"},
                                         encrypt_param={"method": "iterativeAffine"},
                                         tree_param={"max_depth": 3})
evaluation_0 = Evaluation(name="evaluation_0", eval_type="binary")

# add components to pipeline, in order of task execution
pipeline.add_component(reader_0)\
    .add_component(dataio_0, data=Data(data=reader_0.output.data))\
    .add_component(intersect_0, data=Data(data=dataio_0.output.data))\
    .add_component(hetero_secureboost_0, data=Data(train_data=intersect_0.output.data))\
    .add_component(evaluation_0, data=Data(data=hetero_secureboost_0.output.data))

# compile & fit pipeline
pipeline.compile().fit(JobParameters(backend=Backend.EGGROLL, work_mode=WorkMode.STANDALONE))
# to run this task with cluster deployment, use the following setting instead
# may change data engine backend according to actual environment
# pipeline.compile().fit(JobParameters(backend=Backend.EGGROLL, work_mode=WorkMode.CLUSTER))


# query component summary
print(f"Evaluation summary:\n{json.dumps(pipeline.get_component('evaluation_0').get_summary(), indent=4)}")

fate flow搭建:

(用户可以到/fate/examples/dsl/v2/寻找合适的算法和配置文件替换)

flow job submit -c /fate/examples/dsl/v2/hetero_secureboost/test_secureboost_train_complete_secure_conf.json -d /fate/examples/dsl/v2/hetero_secureboost/test_secureboost_train_dsl.json

测试用例3-模型训练、部署及预测

搭建一个hetero secureboost 模型,并使用模型预测

pipeline搭建

pipeline

参考:https://fate.readthedocs.io/en/latest/_build_temp/examples/experiment_template/user_usage/pipeline_predict_tutorial.html

1.上传文件(测试用例1已经上传了)

2.主机环境下创建文件:

cd /home/docker_standalone_fate_1.6.0/fate_job/
vim fit_Hetero_SecureBoost_model.py

文件内容如下:(用户可参照/fate/examples/pipeline/中对应算法,将代码与算法列表https://fate.readthedocs.io/en/latest/_build_temp/python/federatedml/README.html#algorithm-list对应算法介绍结合有助于理解)

from pipeline.backend.config import Backend, WorkMode # configs
from pipeline.backend.pipeline import PipeLine # Pipeline 
from pipeline.component import Reader, DataIO, Intersection, HeteroSecureBoost # fate components
from pipeline.interface import Data  # data flow
from pipeline.runtime.entity import JobParameters # parameter class

# define dataset name and namespace
guest_train_data = {"name": "breast_hetero_guest", "namespace": "experiment"}
host_train_data = {"name": "breast_hetero_host", "namespace": "experiment"}

# initialize pipeline, set guest as initiator and set guest/host party id
pipeline = PipeLine().set_initiator(role="guest", party_id=9999).set_roles(guest=9999, host=10000)

# define components
# reader read raw data 
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role="guest", party_id=9999).component_param(table=guest_train_data)
reader_0.get_party_instance(role="host", party_id=10000).component_param(table=host_train_data)
# data_io transform data
dataio_0 = DataIO(name="dataio_0", with_label=True)
dataio_0.get_party_instance(role="host", party_id=10000).component_param(with_label=False)
# find sample intersection using Intersection components
intersect_0 = Intersection(name="intersection_0")
# hetero secureboost components, setting algorithm parameters
hetero_secureboost_0 = HeteroSecureBoost(name="hetero_secureboost_0",
				     num_trees=5,
				     bin_num=16,
				     task_type="classification",
				     objective_param={"objective": "cross_entropy"},
				     encrypt_param={"method": "iterativeAffine"},
				     tree_param={"max_depth": 3})

# add components to pipeline, in the order of task execution
pipeline.add_component(reader_0)\
.add_component(dataio_0, data=Data(data=reader_0.output.data))\
.add_component(intersect_0, data=Data(data=dataio_0.output.data))\
.add_component(hetero_secureboost_0, data=Data(train_data=intersect_0.output.data))

# compile & fit pipeline
pipeline.compile().fit(JobParameters(backend=Backend.EGGROLL, work_mode=WorkMode.STANDALONE))

# save train pipeline
pipeline.dump("pipeline_saved.pkl")

docker环境下执行:

cd  /fate/fate_job
python fit_Hetero_SecureBoost_model.py

3.主机环境下创建文件

vim predict_instances_by_Hetero_SecureBoost_model.py

文件内容如下:(用户自行将breast_hetero_guest改成上传的表名)

from pipeline.backend.pipeline import PipeLine
from pipeline.component.reader import Reader
from pipeline.interface.data import Data
from pipeline.backend.config import Backend, WorkMode # configs
from pipeline.runtime.entity import JobParameters # parameter class

# load train pipeline
pipeline = PipeLine.load_model_from_file('pipeline_saved.pkl')
# deploy components in training step
pipeline.deploy_component([pipeline.dataio_0, pipeline.intersection_0, pipeline.
                           hetero_secureboost_0])
# set new instances to predict
# new dataset
guest_train_data = {"name": "breast_hetero_guest", "namespace": "experiment"}
host_train_data = {"name": "breast_hetero_host", "namespace": "experiment"}

# set new reader
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role="guest", party_id=9999).component_param(table=guest_train_data)
reader_0.get_party_instance(role="host", party_id=10000).component_param(table=host_train_data)

# new predict pipeline
predict_pipeline = PipeLine()
# update reader
predict_pipeline.add_component(reader_0)
# add selected components from train pipeline onto predict pipeline
predict_pipeline.add_component(pipeline,data=Data(predict_input={pipeline.dataio_0.input.data: reader_0.output.data}))
# run predict model
predict_pipeline.predict(JobParameters(backend=Backend.EGGROLL, work_mode=WorkMode.STANDALONE))

docker环境下执行,FATE Board查看结果

python predict_instances_by_Hetero_SecureBoost_model.py

flow命令搭建

参考:https://fate.readthedocs.io/en/latest/_build_temp/examples/experiment_template/user_usage/dsl_v2_predict_tutorial.html

(1).训练模型

cd /fate/examples/
flow job submit -c dsl/v2/hetero_secureboost/test_secureboost_train_binary_conf.json -d dsl/v2/hetero_secureboost/test_secureboost_train_dsl.json

(2).使用 flow_client 来部署预测任务中需要的组件

记得修改model-id,model-version

flow model deploy --model-id guest-9999#host-9998#model --model-version 2021090109322084026031 --cpn-list "reader_0, dataio_0, intersection_0, hetero_secure_boost_0"

如果执行返回:

{
    "retcode": 100,
    "retmsg": "'Pipeline'"
}

是因为模型还没有搭建完成,可以到board里面查看,等完成后再执行部署

(3).预测文件

用部署文件返回的model_id 、model_version、数据集名字替换 **/fate/examples/dsl/v2/**hetero_secureboost/test_predict_conf.json 内容中的 model_id 、model_version ,替换后的内容保存到 /fate/fate_job/new_test_predict_conf.json(新建文件)

执行

flow job submit -c /fate/fate_job/lxy/new_test_predict_conf.json

如果返回以下内容,是因为host,guest party_id与上传文件host,guest party_id的不一致

{
	'data':'No such file or directory',
	'retcode':100,
	'retmsg':"2"
}

如果提交任务返回如下内容,是因为没有执行部署操作:

{
    "retcode": 100,
    "retmsg": "Model arbiter-10000#guest-9999#host-10000#model 20210908033432743389158 has not been deployed yet."
}

测试用例4-独立使用pipeline搭建模型

1.选定合适的算法和数据集

下载FATE源码

git clone https://github.com/FederatedAI/FATE.git

打开examples目录下的data/READMA.md文件,查看测试数据集信息,使用线性回归模型HeteroLinR,和标签是连续值的student_hetero数据集

2.上传数据集

参考测试用例1

3.编写训练和验证pipeline代码

在测试用例3 pipeline代码基础上修改,

可以结合HeteroLinR算法详情https://fate.readthedocs.io/en/latest/_build_temp/python/federatedml/linear_model/linear_regression/README.html还有源码FATE/FATE-master/examples/pipeline/hetero_linear_regression/pipeline-hetero-linr.py两个文件理解该算法。

发现HeteroLinR pipeline搭建与测试用例3 HeteroSecureBoost模型搭建相比,需要多配置一个arbiter,其他需要改的操作有:修改数据集名称,HeteroLinR模型可以参数可以参考pipeline-hetero-linr.py里模型参数,此处只设置名字

创建linr_model_train_and_evaluation.py文件

代码如下:

from pipeline.backend.config import Backend, WorkMode # configs
from pipeline.backend.pipeline import PipeLine # Pipeline
from pipeline.component import Reader, DataIO, Intersection, HeteroSecureBoost, HeteroLinR ,Evaluation# fate components
from pipeline.interface import Data  # data flow
from pipeline.runtime.entity import JobParameters # parameter class

# define dataset name and namespace
guest_train_data = {"name": "student_hetero_guest", "namespace": "experiment"}
host_train_data = {"name": "student_hetero_host", "namespace": "experiment"}

# initialize pipeline, set guest as initiator and set guest/host party id
pipeline = PipeLine().set_initiator(role="guest", party_id=9999).set_roles(guest=9999, host=10000, arbiter=10000)
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role='guest', party_id=9999).component_param(table=guest_train_data)
reader_0.get_party_instance(role='host', party_id=10000).component_param(table=host_train_data)

dataio_0 = DataIO(name="dataio_0")
dataio_0.get_party_instance(role='guest', party_id=9999).component_param(with_label=True, label_name="y",
																		 label_type="int", output_format="dense")
dataio_0.get_party_instance(role='host', party_id=10000).component_param(with_label=False)

intersection_0 = Intersection(name="intersection_0")
hetero_linr_0 = HeteroLinR(name="hetero_linr_0")

evaluation_0 = Evaluation(name="evaluation_0", eval_type="regression", pos_label=1)
 
pipeline.add_component(reader_0)
pipeline.add_component(dataio_0, data=Data(data=reader_0.output.data))
pipeline.add_component(intersection_0, data=Data(data=dataio_0.output.data))
pipeline.add_component(hetero_linr_0, data=Data(train_data=intersection_0.output.data))
pipeline.add_component(evaluation_0, data=Data(data=hetero_linr_0.output.data))

pipeline.compile()

job_parameters = JobParameters(backend=Backend.EGGROLL, work_mode=WorkMode.STANDALONE)
pipeline.fit(job_parameters)

pipeline.dump("/fate/fate_job/lxy/hetero_linr_pipeline_saved.pkl")#保存模型

docker执行该文件:

cd /fate/fate_job/lxy
python linr_model_train_and_evaluation.py

4.编写预测 pipeline代码

创建linr_model_predict.py文件,在测试用例3 pipeline代码基础上修改,只需要修改表名和模型名即可.

from pipeline.backend.pipeline import PipeLine
from pipeline.component.reader import Reader
from pipeline.interface.data import Data
from pipeline.backend.config import Backend, WorkMode # configs
from pipeline.runtime.entity import JobParameters # parameter class

# load train pipeline
pipeline = PipeLine.load_model_from_file('/fate/fate_job/lxy/hetero_linr_pipeline_saved.pkl')
# deploy components in training step
pipeline.deploy_component([pipeline.dataio_0, pipeline.intersection_0, pipeline.
                           hetero_linr_0])
# set new instances to predict
# new dataset
guest_train_data = {"name": "student_hetero_guest", "namespace": "experiment"}
host_train_data = {"name": "student_hetero_host", "namespace": "experiment"}

# set new reader
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role="guest", party_id=9999).component_param(table=guest_train_data)
reader_0.get_party_instance(role="host", party_id=10000).component_param(table=host_train_data)

# new predict pipeline
predict_pipeline = PipeLine()
# update reader
predict_pipeline.add_component(reader_0)
# add selected components from train pipeline onto predict pipeline
predict_pipeline.add_component(pipeline,data=Data(predict_input={pipeline.dataio_0.input.data: reader_0.output.data}))
# run predict model
predict_pipeline.predict(JobParameters(backend=Backend.EGGROLL, work_mode=WorkMode.STANDALONE))

FATE TEST

运行 FATE 测试的有用工具的集合。

quick start

参考:https://fate.readthedocs.io/en/latest/_build_temp/python/fate_test/README.html#quick-start

编辑默认的 fat_test_config.yaml

更改 /usr/local/lib/python3.6/site-packages/fate_test/fate_test_config.yaml文件,

data_base_dir: path(FATE)

更改为

data_base_dir: /fate/

运行 fate_test 套件

套件:用于运行测试套件,收集 FATE 作业,Testsuite测试套件 用于按顺序运行一组作业。用于作业的数据可以在提交作业之前上传,也可以在作业完成后进行清理。这个工具对于 FATE 的发布测试很有用。

#fate_test suite -i <path contains *testsuite.json>
fate_test suite -i /fate/examples/dsl/v1/homo_nn/testsuite.json
#fate_test suite -i /fate/examples/dsl/v1/hetero_pearson/testsuite.json

运行一些 fate_test 基准测试

用于比较 FATE 和其他机器学习系统之间的建模质量的 benchmark-quality

#fate_test benchmark-quality -i <path contains *benchmark.json>
fate_test benchmark-quality -i /fate/examples/benchmark_quality/hetero_linear_regression/hetero_linr_benchmark.json

开发指南

开发算法模块,详情参考:https://fate.readthedocs.io/en/latest/_build_temp/doc/develop_guide_zh.html

要开发模块,需要执行以下 5 个步骤。

  1. 定义将在此模块中使用的 python 参数对象。

  2. 定义模块的 Setting conf json 配置文件。

  3. 如果模块需要联邦,则需定义传输变量配置文件。

  4. 您的算法模块需要继承model_base类,并完成几个指定的函数。

  5. 定义模型保存所需的protobuf文件。

  6. 若希望通过python脚本直接启动组件,需要在fate_client中定义Pipeline组件。

API

计算API

初始化一个计算会话(computing session),参考:https://fate.readthedocs.io/en/latest/_build_temp/doc/api/computing.html

联邦API

包括低级APIFederationABC()和用户接口secure_add_example_transfer_variable,参考:https://fate.readthedocs.io/en/latest/_build_temp/doc/api/federation.html

参数

类的参数详情参考:https://fate.readthedocs.io/en/latest/_build_temp/doc/api/params.html

遇到过的报错信息和解决方案

Federated schedule error, Please check rollSite and fateflow

报错信息:Federated schedule error, Please check rollSite and fateflow network connectivityrpc request error: <_InactiveRpcError of RPC that terminated…

原因:集群通信问题

解决方案:查看配置文件的 work mode设置与fate部署方式对应(单机 or 集群)

标签:pipeline,FATE,fate,hetero,介绍,详细,data,guest
来源: https://blog.csdn.net/weixin_38235865/article/details/120322777