其他分享
首页 > 其他分享> > 机器学习模型并行训练

机器学习模型并行训练

作者:互联网

 

 

机器学习模型的并行训练方法概括分为两类:

1、数据并行

2、模型并行

 

关于两者区别可参考

https://leimao.github.io/blog/Data-Parallelism-vs-Model-Paralelism/

 

 

数据并行

pytorch提供了torch.nn.parallel.DistributedDataParallel接口实现模型并行训练,具体可参考该网址

https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html

并行原理简单来说是把一个batch的数据分成多份,每份数据包含原来batch里的一部分样本,可看做是把一个batch划分为多个微批,这些微批同时分配到不同gpu(节点)上进行前馈反馈处理,这些节点的模型都是完整模型的拷贝,前馈反馈完成后汇集各自的梯度进行平均,同步到各个节点实现参数更新,然后进入下个batch的训练。

 

模型并行

模型并行是把原本完整的模型拆分成几部分,每部分在一个单独的节点(cpu或gpu)进行计算,数据如同流水线一般在不同节点间传递计算,每个节点处理的数据都是一个完整的batch,这是与数据并行的不同点,详细内容可参考

https://zhuanlan.zhihu.com/p/71566775

https://zhuanlan.zhihu.com/p/87596314

https://www.cnblogs.com/rossiXYZ/p/15681576.html

 

 

混合并行

下图是混合并行架构的简化展示,只为便于理解概念。混合并行即同时使用模型并行与数据并行进行模型训练。

从进程外部来看,数据被分为batch1 、batch2,可看做是一个batch分为了两个子集,每个子集输入到对应的进程计算,最后计算出平均梯度更新参数,这是数据并行。

从进程内部看,进程1中的model1、model2分别是进程2中model拆分的两个子部分,batch1 输入到model1,其输出又作为model2的输入,最后得出整个模型的输出,这是模型并行。

 

 

 

 

自定义分布式并行训练

本节内容是对pytorch官网

https://pytorch.org/tutorials/beginner/dist_overview.html

关于并行与分布式训练的总结。

自定义分布式训练是指利用较底层的api实现定制的分布式并行模型。

 

可使用的接口大致三类,分别是

1、底层api(点对点通信)

主要指torch.distributed.send  torch.distributed.isend 

torch.distributed.recv   torch.distributed.irecv等函数,该类函数属于通信原语,可实现点对点的同步或异步通信,可基于此实现较复杂的通信模式。

详细可见

https://pytorch.org/docs/stable/distributed.html

 

 

2、中层api

torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=- 1.0)

该函数是同步通信接口,他的功能是在指定worker上执行指定的函数,返回值是执行结果。参数1是worker的标识,参数2是需要执行的函数。

torch.distributed.rpc.rpc_async与torch.distributed.rpc.rpc_sync的区别在于,前者是异步接口,返回值是torch.future类型,可使用torch.futures.wait_all函数等待future的执行结果。

 

torch.distributed.rpc.remote函数也是异步接口,效果与

torch.distributed.rpc.rpc_async类似,不同的是前者返回的是torch.distributed.rpc.RRef对象,RRef是对远程对象的引用,这里的远程对象就是worker的执行结果。RRef对象引用的数据可由to_here函数获取。

 

pytorch针对RRef类型额外提供了基于以上函数包装的工具函数,例如

RRef.rpc_sync 、 RRef.rpc_async 、  RRef.remote , 分别对应上述三个函数,这些工具函数不再需要提供参数to,而是自己解析出来,简化开发者工作。

 

3、高层模式api

以all_reduce为代表的高层api实现了固定模式的数据传输方式,这部分不是本文重点,可自研。

https://pytorch.org/docs/stable/distributed.html 

 

实例讲解

下面通过实例着重介绍中层api的使用方法。以下实例是上述混合并行架构的具体实现。

代码中启动了3个进程,一个为master,负责将数据分成2份,统筹各个worker计算并实现分布式求导及参数更新;每个worker负责具体的前馈运算,运算规则分别由NetShard1 , NetShard2两个类定义,两个类内部的数据流转都是linear->relu->linear,区别在于NetShard1是在一个设备上中完成计算;NetShard2的一部分在cuda:0上计算,然后再传到cuda:1上完成后续操作。

 

import os
import time
import torch
import torch.nn as nn
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.optim as optim
from torch.distributed.optim import DistributedOptimizer
from torch.distributed.rpc import RRef


num_classes = 1000  #分类个数

#对应图1右侧进程里的模型
class NetShard1(nn.Module):
    def __init__(self,dev):
        super(NetShard1, self).__init__()
        self.device = dev
        self.net1 = nn.Linear(10, 10).to(self.device)
        self.relu = nn.ReLU().to(self.device)
        self.net2 = nn.Linear(10, 5).to(self.device)

    def forward(self, x_rref):
        x = x_rref.to_here().to(self.device)
        out = self.net2(self.relu(self.net1(x)))
        return out.cpu()

    def parameter_rrefs(self):
        r"""
        Create one RRef for each parameter in the given local module, and return a
        list of RRefs.
        """
        return [RRef(p) for p in self.parameters()]

#对应图1左侧进程里的模型
class NetShard2(nn.Module):
    def __init__(self,dev1,dev2):
        super(NetShard2, self).__init__()
        self.device1 = dev1
        self.device2 = dev2
        self.net1 = nn.Linear(10, 10).to(self.device1)
        self.relu = nn.ReLU().to(self.device2)
        self.net2 = nn.Linear(10, 5).to(self.device2)

    def forward(self, x_rref):
        x = x_rref.to_here().to(self.device1)
        out = self.net2(self.relu(self.net1(x).to(self.device2)))
        return out.cpu()

    def parameter_rrefs(self):
        r"""
        Create one RRef for each parameter in the given local module, and return a
        list of RRefs.
        """
        return [RRef(p) for p in self.parameters()]


class MyNet(nn.Module):
    """
    Assemble two parts as an nn.Module and define train logic
    """
    def __init__(self, workers, *args, **kwargs):
        super(MyNet, self).__init__()

        # Put the first part of the MyNet on workers[0]
        self.p1_rref = rpc.remote(
            workers[0],
            NetShard1,
            args = ("cpu",) + args,
            kwargs = kwargs
        )

        # Put the second part of the MyNet  on workers[1]
        self.p2_rref = rpc.remote(
            workers[1],
            NetShard2,
            args = ("cuda:0","cuda:1") + args,
            kwargs = kwargs
        )

    def forward(self, xs):
        # Split the input batch xs into 2-batches, and collect async RPC
        x = xs.chunk(xs,2,dim=0)
        x_rref1 = RRef(x[0])
        x_rref2 = RRef(x[1])

        futs = [self.p1_rref.rpc_async().forward(x_rref1), self.p2_rref.rpc_async().forward(x_rref2)]

        # collect and cat all output tensors into one tensor.
        return torch.cat(torch.futures.wait_all(futs),dim=0)

    def parameter_rrefs(self):
        remote_params = []
        remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())
        remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())
        return remote_params


#########################################################
#                   Run RPC Processes                   #
#########################################################

num_batches = 3
batch_size = 120
cols = 128
rows = 128


def run_master():

    # put the two model parts on worker1 and worker2 respectively
    model = MyNet(["worker1", "worker2"])
    loss_fn = nn.MSELoss()
    opt = DistributedOptimizer(
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )

    one_hot_indices = torch.LongTensor(batch_size) \
                           .random_(0, num_classes) \
                           .view(batch_size, 1)

    for i in range(num_batches):
        print(f"Processing batch {i}")
        # generate random inputs and labels
        inputs = torch.randn(batch_size, 3, rows, cols)
        labels = torch.zeros(batch_size, num_classes) \
                      .scatter_(1, one_hot_indices, 1)

        # The distributed autograd context is the dedicated scope for the
        # distributed backward pass to store gradients, which can later be
        # retrieved using the context_id by the distributed optimizer.
        with dist_autograd.context() as context_id:
            outputs = model(inputs)
            dist_autograd.backward(context_id, [loss_fn(outputs, labels)])
            opt.step(context_id)


def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'

    # Higher timeout is added to accommodate for kernel compilation time in case of ROCm.
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256, rpc_timeout=300)

    if rank == 0:
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        run_master()
    else:
        rpc.init_rpc(
            f"worker{rank}",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = 3
    tik = time.time()
    mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)
    tok = time.time()
    print(f"execution time = {tok - tik}")

 

标签:nn,训练,distributed,模型,torch,并行,rpc,self
来源: https://www.cnblogs.com/candl/p/16412267.html