编程语言
首页 > 编程语言> > Lotus | Filecoin | Sector-storage部分源码与模块理解

Lotus | Filecoin | Sector-storage部分源码与模块理解

作者:互联网

深入理解存储与管理

Filecoin的存储单元称之为扇区(机械硬盘的最小存储单元就叫做扇区)。在miner进行数据存储的时候会向filecoin网络提供一系列的证明保证正确存储了数据。在证明sector存储的过程中需要经历一系列的处理,P1/P2/C1/C2。P-Precommit,预提交;C-commit 提交。 Filecoin的存储管理逻辑主要是现在sector-storage中。 在这之中 Worker、Sechduler、Manager三者是绕不开的话题。

相关模块

Woekr主要分为localworker以及remoteworker,localworker处理本地服务,remoteworker支持远程服务处理。 Manager-管理多个Worker Sechduler-调度器调度多个Worker,一个Manager通常都会有一个Sehcduler Store-Store存储系统

README图解

README中的图分为上下两个部分,上面是Manager它包含了:store存储系统、localWorker和scheduler下面是remoteWorker。下面灰色的部分都是remoteWorker manager对上面的模块进行管理,使用scheduler对Worker进行管理,通常一个Manager对应一个Worker。Worker分为localworker以及remoteworker,

工作流程:Manager管理scheduler调度调度器,调度器管理多个worker。每一个连接到manager的worker都活同步他的CPU内存以及显存的信息。worker分为localworker以及remoteworker scheduler接受请求,根据请求类型以及资源请求选择合适的worker为他进行资源分配

主要模块:scheduler worker manager

模块1 schedule调度模块

模块的主要功能:资源分配与调度 模块文件:resource.go/cbor_gen.go/sched.go/sched_resource.go/sche_worker.go

resource.go模块

此模块描述了运行时资源分配情况以及性能参数。

重要数据结构:

描述资源情况,他的方法是对Thread进行资源的分配

type Resources struct {
	MinMemory uint64  
	MaxMemory uint64  

	MaxParallelism int  
	CanGPU         bool

	BaseMinMemory uint64
}

重要函数方法:

线程资源分配函数
资源情况映射表,通过封装任务的Task类型和封装证明映射到对应的资源,用于查询此任务的资源占用情况。
/*

 Percent of threads to allocate to parallel tasks

 12  * 0.92 = 11
 16  * 0.92 = 14
 24  * 0.92 = 22
 32  * 0.92 = 29
 64  * 0.92 = 58
 128 * 0.92 = 117

*/
var ParallelNum uint64 = 92
var ParallelDenom uint64 = 100
n := (wcpus * ParallelNum) / ParallelDenom //计算公式
初始化resrouceTable的函数方法

cbor_gen.go模块

做各种角色的JSON包和cbor之间的转换 。编码和解码的集合,对象分别是 Call、WorkStae、 WorkID

重要方法函数:

将调用信息的JSON数据转换成cbor简明二进制展现
将cbor数据转换成数据结构打包的JSON数据

sched.go模块

进行资源调度和分配的核心模块 是scheduler调度器方法的主要定义部分

关键类型::

用于描述调度窗口,存放了worker的请求数组

type schedWindow struct {
	allocated activeResources
	todo      []*workerRequest
}
优先级类型
用context表示JSON数据格式类型
表示一个匿名函数 

type WorkerAction func(ctx context.Context, w Worker) error
一个很重要的类型——调度器——用于分配计算机资源给不同的任务.在Manager类与WorkScheduler类中都有他的身影
//分配器
type scheduler struct {
	workersLk sync.RWMutex
	workers   map[WorkerID]*workerHandle

	schedule       chan *workerRequest
	windowRequests chan *schedWindowRequest
	workerChange   chan struct{} // worker added / changed/freed resources
	workerDisable  chan workerDisableReq

	// owned by the sh.runSched goroutine
	schedQueue  *requestQueue
	openWindows []*schedWindowRequest

	workTracker *workTracker


	info chan func(interface{})

	//关闭中的 已经关闭的 测试同步通道
	closing  chan struct{}
	closed   chan struct{}
	testSync chan struct{} // used for testing
}
这个结构体根据目前的资源情况去调度分配 对sector、Windows、activeResource进行操作
计算机目前的活跃资源
资源分配窗口 显示当前计算机的活跃资源以及矿工提出的请求的序列
矿工请求处理窗口 把矿工所请求的已完成的任务存储到通道中 用此数据结构表示并存储一个矿工已经完成的任务
矿工具体的请求的数据结构。内部的请求信息包含了山区ID、证明信息、封装任务的任务类型、该任务的优先级、任务的开始时间、在heap存储空间中的索引下标、worker错误相应通道等
错误响应信息的数据结构
分配诊断请求信息结构体。包含了扇区ID 、封装任务类型以及该调度信息的优先级
资源诊断信息结构体,包含了请求信息的集合以及打开的窗口的集合

关键函数方法:

从context中获取任务的优先级
给任务赋上优先级
处理worker的错误信息 打包成error类并返会
创建一个新的调度器 返回该调度器的指针
调度器的成员函数 ,开始进入监听状态等待矿工的请求
进行资源分配的起始函数,分配器的成员函数,监听分配器中各个通道的变化并作出相对应的反应。
尝试进行资源分配的核心过程函数。
任务分配基于优先级、Worker工人资源可用性、特定任务Worker的性能、窗口请求年龄
1. 为每个调度队列中的任务找到能处理他们的可容纳窗口
	1.1 创建解决任务的窗口容量的列表 acceptableWindws slice
	1.2 根据任务选择器的性能为窗口排序
2. 再次遍历调度队列 把任务分配给第一个有可用资源的合适的窗口
3. 把被调度后的窗口提交给Worker
关闭调度处理的函数,遍历矿工列表,清理为他们分配的资源,释放矿工所占用的窗口
获取调度器信息的函数
关闭调度器函数 关闭失败则返回报错信息

sched_resource.go模块

分配器-计算机资源方面进行查询判断操作的包

关键函数方法::

(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error 

对worker分配资源的入口函数 并把它所需要的资源加入activeResource活动资源的结构体中
把worker所需要的资源加入到activeResource中的具体实现
释放woker所占的资源 并删除冲activeResource中删除对应的资源信息
查询目前的activeResource是否足够该woker所请求的资源 返回布尔值 true表示成功 false表示不能分配
查看目前活跃资源当中CPU占用率 最小内存空间和最大内存空间占用率 返回三个当中一个最大的,活动资源的利用率
返回woker占用的所有活动资源的占用率之和

sched_worker.go

woker分配数据结构。统揽。定义了schedWorker以及此结构体的一众方法 , Woker资源调度的具体实现

关键数据结构:

用于对Worker进行资源分配的结构体,schedWorker结构 包含了一个资源调度器 wokerer信息查询 workerID 分配窗口的通道 请求窗口数量

type schedWorker struct {
	sched  *scheduler
	worker *workerHandle

	wid WorkerID

	heartbeatTimer   *time.Ticker
	scheduledWindows chan *schedWindow
	taskDone         chan struct{}

	windowsRequested int
}

schedWorker通过workHandle来对worker进行管理 资源分配。workerHandle处理结构体:

type workerHandle struct {
	workerRpc Worker		//工人的rpc调用接口

	info storiface.WorkerInfo //工人的信息

	preparing *activeResources
	active    *activeResources

	lk sync.Mutex

	wndLk         sync.Mutex
	activeWindows []*schedWindow

	enabled bool

	// for sync manager goroutine closing
	cleanupStarted bool
	closedMgr      chan struct{}
	closingMgr     chan struct{}
}

这是核心的调度器数据结构的定义, 调度器的workers大映射表是从WorkerID到workerHandle解决方案

type scheduler struct {
	workersLk sync.RWMutex
	workers   map[WorkerID]*workerHandle

	schedule       chan *workerRequest
	windowRequests chan *schedWindowRequest
	workerChange   chan struct{} // worker added / changed/freed resources
	workerDisable  chan workerDisableReq

	// owned by the sh.runSched goroutine
	schedQueue  *requestQueue
	openWindows []*schedWindowRequest

	workTracker *workTracker


	info chan func(interface{})

	//关闭中的 已经关闭的 测试同步通道
	closing  chan struct{}
	closed   chan struct{}
	testSync chan struct{} // used for testing
}

关键函数方法::

该函数运行一个Worker处理其的请求,一个worker对应一个handleWorker的结构体,如果已经存在结构体 也就是在其他线程中解决此worker的情况 直接返回map[workerID]*handleWorker中的handleWorker的结构体指针,如果不存在,则需要配置调度矿工的信息,执行处理矿工的函数;

context类型的参数用于读取其其中的信息以对后续的worker调度进行设置

处理矿工需求的实现函数。 读取context中的内容开始。开始进行对worker请求的处理。如果worker调度器没有更多的窗口来应对请求,则结束。等待更多的窗口到来或者是等待阻塞中的任务结束。如果有新窗口的到来,则退出循环向窗口发送请求信息;如果没有新窗口的到来则等待更多的任务被主调度器分配或者等待worker结束一项任务并空闲下来。 空闲之后的worker要对其分配资源,绑定worker和窗口以及分配进程任务给窗口
worker清空函数,关闭worker的manager管理器以及worker,用内置close()函数关闭manager。通过删除调度器中打开窗口中的wokerID窗口,从而删除worker。
使得context所表示的内容无效化。先在主调度器的线程中等待清理程序,再等待清理程序完成以使其无效化,最终清空活动窗口,使得请求窗口清0
对session进行检查
检查所有的请求窗口
等待窗口的更新或者是worker任务的完成
把老窗口中的任务挪动到新窗口当中,将worker和窗口绑定在一起
将任务分配到目前所有的活动窗口中
开始执行一个任务的入口函数
核心函数:
runSched():准备进入资源调度状态,执行此函数则进入监听状态,监听scheduler对象中通道的变化,并作出对应的决策
trySched():调度资源的核心实现,把请求的任务分配给窗口去
runWorker():开始对worker的请求进行处理,如果在其他线程中存在解决方案则直接返回,否则开始处理,开启一个goroutinue

模块2 Manager模块

模块文件:Manager.go\Manager_calltracker.go

Manager.go

重要数据结构:

包含manger的各种信息,结构体中包含了本地存储的接口、远程存储信息、本地存储类、Scheduler调度器以及storiface.CallID到WorkerID和chan result的映射等。
存储结果信息,有两个参数:interface{}结构体和Error出错信息类。
参数中含有能并行获取的数量限制以及一系列布尔型的变量,用于配置。 布尔型变量中包含了:AllowCommit 允许提交/AllowAddPiece 允许增加piece/AllowPreCommit1 允许预提交1/AllowPreCommit2 允许预提交2/AllowUnseal 允许解封扇区。

重要函数方法:

New这个函数负责创建一个新的manager,执行过程大致为:
1.首先新建一个本地的存储点,如果创建失败则跳出函数返回错误信息
2.随后新建一个provider,如果创建失败则跳出函数返回错误信息
3.新建一个线上(远距离)的存储点
4.将新的manager信息放入参数m中,设置m的基本参数并运行
5.新建任务,并且设置任务的基本参数 也就是sealerConfig中的参数
6.增加工人,如果增加失败返回错误信息
7.新建manager完成
这函数功能是增加当地存储空间,会报三种错误1.增加储存空间错误2.已经存在该存储空间错误3.存储空间配置错误
增加worker,调用sched文件中的runWork函数。开始进行worker请求处理,如果在其他线程中存在解决方案则直接返回,否则开始处理(这个地方的请求是增加Worker 并且传递过去了Worker类型的参数
提供远程的HTTP服务
空计划指令,什么都不进行操作
获取计划,经理通过等待worker获得计划
从指定扇区中取出一部分数据,通过调用函数tryReadUnsealedPiece()来尝试读取扇区中已经被解封的数据
尝试读取没封装的数据
在sector中新建文件存储片区,执行步骤:1.判锁2.查看是否有存在的片区,如果有则分配,如果没有则新建3.录入片区的一系列信息并返回
执行上述这一系列操作
这个模块就是将需要存储的信息传递给manager并且释放worker中扇区和缓存中无用的信息。新建选择其,并调用manager中的scheduler对象的方法,为worker分配资源。
释放没有被封装的(函数里也没有具体的实现)只有一句提示“即将执行释放操作”
移除封装或者未封装的扇区,或者移除缓冲区
代码复用,返回是否执行成功的结果

该函数方法总结:

此函数定义了大量的接口函数,通过manager进行资源调度、存储分配、远程HTTP服务、读取解封之后的扇区中的数据操作

manager_calltracker.go

重要数据结构:

代表了工作的代号,参数有method表示worker所进行封装的封装任务的任务类型
用于表示JSON格式的参数
代表工作状态的结构体,包含了工作的编号WorkID,工作的阶段(已经开始、执行中、已完成)、storiface.CallID到WorkID的绑定容器、专门用于保存work的错误、worker的主机名以及Work开始时间
这是一个枚举类型,具体表示了三种工作状态
const (
	wsStarted WorkStatus = "started" // task started, not scheduled/running on a worker yet
	wsRunning WorkStatus = "running" // task running on a worker, waiting for worker return
	wsDone    WorkStatus = "done"    // task returned from the worker, results available
)

重要函数方法:

新建工作 会出现调度错误,最后返回WorkID对象
这个模块主要就是负责对正在执行的工作设置追踪器(只有处于状态wsRunning才会设置)
获得工作状态,并执行cancel操作,将出现错误的程序或者没有被追踪的程序cancel掉。 1.新建一个workid 2.查看是否新建成功 3.查看workid是否存在,如果不存在返回错误 4.如果不存在则开始一个新的work,并且根据该work的状态执行操作 5.如果workid存在 说明程序已经运行或者已经被追踪,则返回true
开始工作并设置追踪其
首先获取Work的状态,获取失败则报错,其次判断work是否处于started阶段 是则返回,最后判断work是否被正确追踪,错误则返回
调用waitCall 等待调用
返回结果信息
允许中途结束work

模块3 Worker工作者模块

模块文件:worker_ calltracker.go worker_local.go worker_tracked.go

worker_calltracker.go

主要是描述call指令的几种状态:开始、完成、结果返回、未完成

worker_local.go

关键数据结构:

此类中包含了工作种类和是否允许交换的标志
存储空间、本地存储、扇区标号、返回信息、执行操作、是否允许交换、指令追踪、已经接受的任务、运行、任务锁、阶段、是否可以工作、关闭

关键函数方法:

新建一个本地的工人
申请一块扇区
异步调用 主要负责跟踪调用Call
尝试将操作结果返回给manager,如果操作成功就返回ture
新建扇区
在扇区中新增存储块
结束部分 删除不需要的信息 避免其占用内存
释放未封装的存储数据
解封并清除扇区或者缓存中的信息
转移存储
解封存储块,并且将没有用的信息删除
获取存储、cpu、gpu等信息

静态设置

模块4 Seletor 选择器模块

模块关键包含:selector_alloc.go selector_existing.go selector_task.go

selector_alloc.go

重要函数方法:

新建一个分配选择器
分配选择器初始化:1.获得工人的任务类型2.获得工人的路径3.获取扇区的大小4.寻找最适合的存储空间
compare,比较两个工人的对资源的利用效率,如果a < b返回ture 反之返回false

selector_existing.go

新建一个存在选择器
存在选择器初始化:1.获得工人的任务类型2.获得工人的路径3.获取扇区的大小4.寻找最适合的存储空间
compare,比较两个工人的对资源的利用效率,如果a< b返回ture 反之返回false

selector_task.

新建一个任务选择器
任务选择器初始化:获取支持工人的工作类型
compare,比较。分别获取ab两个工人的工作类型,选取达到目标做的较少的工人

杂项 零散的模块

Sector-Storage Scheduler模块层次包含情况

scheduler:
	map[WorkerID]*workerHandle
	chan *workerRequest
	chan *schedWindowRequest
		WorkerID
		schedWindow
			activeResources
			[]*workerRequest
	chan workerDisableReq
		[]*schedWindow
		WorkerID
	*requestQueue
		[]*workerRequest
	[]*schedWindowRequest
		schedWindow
		chan *schedWindow
	*workTracker
		sync.Mutex
		map[storiface.CallID]struct{}
			abi.SectorID
			uuid.UUID
		 map[storiface.CallID]trackedWork
			trackedWork
				storiface.WorkerJob
					CallID
					abi.SectorID
					sealtasks.TaskType
					运行等待时间
					开始时间
					hostname
				WorkerID
				string
		trackedWorker
			WorkerID
			storiface.WorkerInfo
			*workTracker

workerHandle:
	Worker
	WorkerInfo
	activeResources
	[]*schedWindow
schedWindow:
	activeResources
	[]*workerRequest
		workerRequest
			WorkerAction
			sealtasks.TaskType
			storage.SectorRef
			WorkerSelector
			chan<- workerResponse

调度情况模块1 scheduler模块

  1. 资源分配路径: runSchend() -> trySched() [schedWindow] runSched()工作过程: 进行资源分配的起始函数,分配器的成员函数,监听分配器中各个通道的变化并作出相对应的反应。 第一阶段进入select监听状态,如果有请求加入则把请求加入请求队列中,如果有请求窗口进入则把窗口分配到已打开窗口的数组当中。当调度器关闭的时候则退出第一状态。 第二状态是在在初始化和调度器开始进行分配动作[比如workerChange workerDisable等情况发生]之后开始,同样是进入循环监听状态如果此时scheduler没有任何改变则退出第二监听状态。 之后对数据进行处理,对请求窗口等信息保存的对应的容器中。然后调用trySched()函数正式进行资源分配

trySched()工作过程: 尝试进行资源分配的核心过程函数。 任务分配基于优先级、Worker工人资源可用性、特定任务Worker的性能、窗口请求年龄

  1. 为每个调度队列中的任务找到能处理他们的可容纳窗口 1.1 创建解决任务的窗口容量的列表 acceptableWindws slice 1.2 根据任务选择器的性能为窗口排序
  2. 再次遍历调度队列 把任务分配给第一个有可用资源的合适的窗口
  3. 把被调度后的窗口提交给Worker
  4. 资源调度关闭路径: schedClose() -> workerCleanup() scheduler调度器管理workers,schedClose()函数遍历所有的worker对每个worker执行对象的方法workerCleanup()来清空worker占用的资源并将其从数组中删除实现关闭。
  5. activeSource 资源分配的实现路径: withResources() -> [add free等接口函数](获得简单信息的函数)

调度情况模块2 worker模块

func (wh *workerHandle) utilization() ->func (a *activeResources) utilization workHandle结构体调用活跃资源的方法来获得当前资源的占用情况

  1. worker资源调度路径:

func (sh *scheduler) runWorker() -> func (sw *schedWorker) go sw.handleWorker() 工作过程:->func (sw *schedWorker) workerCompactWindows() ->func (sw *schedWorker) processAssignedWindows()

runWorker()该函数运行一个Worker处理其的请求,一个worker对应一个handleWorker的结构体,如果已经存在结构体 也就是在其他线程中解决此worker的情况 直接返回map[workerID]*handleWorker中的handleWorker的结构体指针,如果不存在,则需要配置调度矿工的信息,执行处理矿工的函数。 在存储好schedWorker worker资源调度清单之后 执行go sw.handleWorker(),开启一个goroutinue在后台并行执行。 handleWorker是一个专门处理worker调度的方法,他是处理矿工需求的实现函数。 读取context中的内容开始。开始进行对worker请求的处理。如果worker调度器没有更多的窗口来应对请求,则结束。等待更多的窗口到来或者是等待阻塞中的任务结束。如果有新窗口的到来,则退出循环向窗口发送请求信息;如果没有新窗口的到来则等待更多的任务被主调度器分配或者等待worker结束一项任务并空闲下来。空闲之后的worker要对其分配资源,绑定worker和窗口以及分配进程任务给窗口

  1. worker_tracked worker跟踪部分路径:

SealPreCommit1() SealPreCommit2() SealCommit1()...等函数调用track方法 ->func (wt *workTracker) track -> 匿名函数

调度情况模块3 Manager模块

调度情况模块4 Selector模块

selector模块没有特别的调度线路,仅仅提供了三种选择器的函数方法以供调用。

原文地址:Lotus | Filecoin | Sector-storage部分源码与模块理解 - 知乎 (zhihu.com)

标签:Sector,函数,Lotus,worker,调度,源码,模块,func,scheduler
来源: https://www.cnblogs.com/python-learn/p/15913152.html