Lotus | Filecoin | Sector-storage部分源码与模块理解
作者:互联网
深入理解存储与管理
Filecoin的存储单元称之为扇区(机械硬盘的最小存储单元就叫做扇区)。在miner进行数据存储的时候会向filecoin网络提供一系列的证明保证正确存储了数据。在证明sector存储的过程中需要经历一系列的处理,P1/P2/C1/C2。P-Precommit,预提交;C-commit 提交。 Filecoin的存储管理逻辑主要是现在sector-storage中。 在这之中 Worker、Sechduler、Manager三者是绕不开的话题。
相关模块
Woekr主要分为localworker以及remoteworker,localworker处理本地服务,remoteworker支持远程服务处理。 Manager-管理多个Worker Sechduler-调度器调度多个Worker,一个Manager通常都会有一个Sehcduler Store-Store存储系统
README图解
README中的图分为上下两个部分,上面是Manager它包含了:store存储系统、localWorker和scheduler下面是remoteWorker。下面灰色的部分都是remoteWorker manager对上面的模块进行管理,使用scheduler对Worker进行管理,通常一个Manager对应一个Worker。Worker分为localworker以及remoteworker,
- Manager负责给每一个存储模块sectorIndex 这标志了数据存储的扇区编号
- manager通过访问JsonRPC接口来实现对remote worker的管理
- 使用manager的fetchHandle API接口来实现文件的传输。
- specs-storage.Prover/Sealer/Storage实现sector的证明、封装以及存储
- 远程的remote Worker的资源调度也由本地的scheduler控制
- 每个连接到Manager的Worker会和Manager同步它的内存/CPU以及显存的信息。Scheduler在接受到新的请求时,会针对请求(Task)的类型以及资源的需求,从当前Worker中挑选最合适的Worker进行请求的处理。如何选择Worker,可以查看selector的相关逻辑。
工作流程:Manager管理scheduler调度调度器,调度器管理多个worker。每一个连接到manager的worker都活同步他的CPU内存以及显存的信息。worker分为localworker以及remoteworker scheduler接受请求,根据请求类型以及资源请求选择合适的worker为他进行资源分配
主要模块:scheduler worker manager
模块1 schedule调度模块
模块的主要功能:资源分配与调度 模块文件:resource.go/cbor_gen.go/sched.go/sched_resource.go/sche_worker.go
resource.go模块
此模块描述了运行时资源分配情况以及性能参数。
重要数据结构:
- resource结构体
描述资源情况,他的方法是对Thread进行资源的分配
type Resources struct {
MinMemory uint64
MaxMemory uint64
MaxParallelism int
CanGPU bool
BaseMinMemory uint64
}
重要函数方法:
- func (r Resources) Thr\ads(wcpus uint64) uint64
线程资源分配函数
- var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
资源情况映射表,通过封装任务的Task类型和封装证明映射到对应的资源,用于查询此任务的资源占用情况。
/*
Percent of threads to allocate to parallel tasks
12 * 0.92 = 11
16 * 0.92 = 14
24 * 0.92 = 22
32 * 0.92 = 29
64 * 0.92 = 58
128 * 0.92 = 117
*/
var ParallelNum uint64 = 92
var ParallelDenom uint64 = 100
n := (wcpus * ParallelNum) / ParallelDenom //计算公式
- func init()
初始化resrouceTable的函数方法
cbor_gen.go模块
做各种角色的JSON包和cbor之间的转换 。编码和解码的集合,对象分别是 Call、WorkStae、 WorkID
重要方法函数:
- func (t *Call) MarshalCBOR(w io.Writer) error
将调用信息的JSON数据转换成cbor简明二进制展现
- func (t *Call) UnmarshalCBOR(r io.Reader) error
将cbor数据转换成数据结构打包的JSON数据
- func (t *WorkState) MarshalCBOR(w io.Writer) error
- func (t *WorkState) UnmarshalCBOR(r io.Reader) error
- func (t *WorkID) MarshalCBOR(w io.Writer) error
- func (t *WorkID) UnmarshalCBOR(r io.Reader) error
sched.go模块
进行资源调度和分配的核心模块 是scheduler调度器方法的主要定义部分
关键类型::
- schedWindow
用于描述调度窗口,存放了worker的请求数组
type schedWindow struct {
allocated activeResources
todo []*workerRequest
}
- SchedPriorityKey
优先级类型
- context.Context
用context表示JSON数据格式类型
- WorkerAction
表示一个匿名函数
type WorkerAction func(ctx context.Context, w Worker) error
- scheduler
一个很重要的类型——调度器——用于分配计算机资源给不同的任务.在Manager类与WorkScheduler类中都有他的身影
//分配器
type scheduler struct {
workersLk sync.RWMutex
workers map[WorkerID]*workerHandle
schedule chan *workerRequest
windowRequests chan *schedWindowRequest
workerChange chan struct{} // worker added / changed/freed resources
workerDisable chan workerDisableReq
// owned by the sh.runSched goroutine
schedQueue *requestQueue
openWindows []*schedWindowRequest
workTracker *workTracker
info chan func(interface{})
//关闭中的 已经关闭的 测试同步通道
closing chan struct{}
closed chan struct{}
testSync chan struct{} // used for testing
}
- workerHandle
这个结构体根据目前的资源情况去调度分配 对sector、Windows、activeResource进行操作
- activeResources
计算机目前的活跃资源
- schedWindow
资源分配窗口 显示当前计算机的活跃资源以及矿工提出的请求的序列
- schedWindowRequest
矿工请求处理窗口 把矿工所请求的已完成的任务存储到通道中 用此数据结构表示并存储一个矿工已经完成的任务
- workerRequest
矿工具体的请求的数据结构。内部的请求信息包含了山区ID、证明信息、封装任务的任务类型、该任务的优先级、任务的开始时间、在heap存储空间中的索引下标、worker错误相应通道等
- workerResponse
错误响应信息的数据结构
- SchedDiagRequestInfo
分配诊断请求信息结构体。包含了扇区ID 、封装任务类型以及该调度信息的优先级
- SchedDiagInfo
资源诊断信息结构体,包含了请求信息的集合以及打开的窗口的集合
关键函数方法:
- getPriority
从context中获取任务的优先级
- WithPriority
给任务赋上优先级
- func (r *workerRequest) respond(err error)1
处理worker的错误信息 打包成error类并返会
- func newScheduler() *scheduler
创建一个新的调度器 返回该调度器的指针
- func (sh *scheduler) Schedule
调度器的成员函数 ,开始进入监听状态等待矿工的请求
- func (sh *scheduler) runSched()
进行资源分配的起始函数,分配器的成员函数,监听分配器中各个通道的变化并作出相对应的反应。
- func (sh *scheduler) trySched()
尝试进行资源分配的核心过程函数。
任务分配基于优先级、Worker工人资源可用性、特定任务Worker的性能、窗口请求年龄
1. 为每个调度队列中的任务找到能处理他们的可容纳窗口
1.1 创建解决任务的窗口容量的列表 acceptableWindws slice
1.2 根据任务选择器的性能为窗口排序
2. 再次遍历调度队列 把任务分配给第一个有可用资源的合适的窗口
3. 把被调度后的窗口提交给Worker
- func (sh *scheduler) schedClose()
关闭调度处理的函数,遍历矿工列表,清理为他们分配的资源,释放矿工所占用的窗口
- func (sh *scheduler) Info(ctx context.Context) (interface{}, error)
获取调度器信息的函数
- func (sh *scheduler) Close(ctx context.Context) error
关闭调度器函数 关闭失败则返回报错信息
sched_resource.go模块
分配器-计算机资源方面进行查询判断操作的包
关键函数方法::
- func (a *activeResources) withResources
(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error
对worker分配资源的入口函数 并把它所需要的资源加入activeResource活动资源的结构体中
- func (a *activeResources) add(wr storiface.WorkerResources, r Resources)
把worker所需要的资源加入到activeResource中的具体实现
- func (a *activeResources) free(wr storiface.WorkerResources, r Resources)
释放woker所占的资源 并删除冲activeResource中删除对应的资源信息
- func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, res storiface.WorkerResources) bool
查询目前的activeResource是否足够该woker所请求的资源 返回布尔值 true表示成功 false表示不能分配
- func (a *activeResources) utilization(wr storiface.WorkerResources) float64
查看目前活跃资源当中CPU占用率 最小内存空间和最大内存空间占用率 返回三个当中一个最大的,活动资源的利用率
- func (wh *workerHandle) utilization() float64
返回woker占用的所有活动资源的占用率之和
sched_worker.go
woker分配数据结构。统揽。定义了schedWorker以及此结构体的一众方法 , Woker资源调度的具体实现
关键数据结构:
- schedWorker
用于对Worker进行资源分配的结构体,schedWorker结构 包含了一个资源调度器 wokerer信息查询 workerID 分配窗口的通道 请求窗口数量
type schedWorker struct {
sched *scheduler
worker *workerHandle
wid WorkerID
heartbeatTimer *time.Ticker
scheduledWindows chan *schedWindow
taskDone chan struct{}
windowsRequested int
}
- workHandle
schedWorker通过workHandle来对worker进行管理 资源分配。workerHandle处理结构体:
type workerHandle struct {
workerRpc Worker //工人的rpc调用接口
info storiface.WorkerInfo //工人的信息
preparing *activeResources
active *activeResources
lk sync.Mutex
wndLk sync.Mutex
activeWindows []*schedWindow
enabled bool
// for sync manager goroutine closing
cleanupStarted bool
closedMgr chan struct{}
closingMgr chan struct{}
}
- scheduler
这是核心的调度器数据结构的定义, 调度器的workers大映射表是从WorkerID到workerHandle解决方案
type scheduler struct {
workersLk sync.RWMutex
workers map[WorkerID]*workerHandle
schedule chan *workerRequest
windowRequests chan *schedWindowRequest
workerChange chan struct{} // worker added / changed/freed resources
workerDisable chan workerDisableReq
// owned by the sh.runSched goroutine
schedQueue *requestQueue
openWindows []*schedWindowRequest
workTracker *workTracker
info chan func(interface{})
//关闭中的 已经关闭的 测试同步通道
closing chan struct{}
closed chan struct{}
testSync chan struct{} // used for testing
}
关键函数方法::
- func (sh *scheduler) runWorker(ctx context.Context, w Worker) error
该函数运行一个Worker处理其的请求,一个worker对应一个handleWorker的结构体,如果已经存在结构体 也就是在其他线程中解决此worker的情况 直接返回map[workerID]*handleWorker中的handleWorker的结构体指针,如果不存在,则需要配置调度矿工的信息,执行处理矿工的函数;
context类型的参数用于读取其其中的信息以对后续的worker调度进行设置
- func (sw *schedWorker) handleWorker()
处理矿工需求的实现函数。 读取context中的内容开始。开始进行对worker请求的处理。如果worker调度器没有更多的窗口来应对请求,则结束。等待更多的窗口到来或者是等待阻塞中的任务结束。如果有新窗口的到来,则退出循环向窗口发送请求信息;如果没有新窗口的到来则等待更多的任务被主调度器分配或者等待worker结束一项任务并空闲下来。 空闲之后的worker要对其分配资源,绑定worker和窗口以及分配进程任务给窗口
- func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle)
worker清空函数,关闭worker的manager管理器以及worker,用内置close()函数关闭manager。通过删除调度器中打开窗口中的wokerID窗口,从而删除worker。
- func (sw *schedWorker) disable(ctx context.Context) error
使得context所表示的内容无效化。先在主调度器的线程中等待清理程序,再等待清理程序完成以使其无效化,最终清空活动窗口,使得请求窗口清0
- func (sw *schedWorker) checkSession(ctx context.Context) bool
对session进行检查
- func (sw *schedWorker) requestWindows() bool
检查所有的请求窗口
- func (sw *schedWorker) waitForUpdates() (update bool, sched bool, ok bool)
等待窗口的更新或者是worker任务的完成
- func (sw *schedWorker) workerCompactWindows()
把老窗口中的任务挪动到新窗口当中,将worker和窗口绑定在一起
- func (sw *schedWorker) processAssignedWindows()
将任务分配到目前所有的活动窗口中
- func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error
开始执行一个任务的入口函数
核心函数:
runSched():准备进入资源调度状态,执行此函数则进入监听状态,监听scheduler对象中通道的变化,并作出对应的决策
trySched():调度资源的核心实现,把请求的任务分配给窗口去
runWorker():开始对worker的请求进行处理,如果在其他线程中存在解决方案则直接返回,否则开始处理,开启一个goroutinue
模块2 Manager模块
模块文件:Manager.go\Manager_calltracker.go
Manager.go
重要数据结构:
- Manager 结构体
包含manger的各种信息,结构体中包含了本地存储的接口、远程存储信息、本地存储类、Scheduler调度器以及storiface.CallID到WorkerID和chan result的映射等。
- result结构体
存储结果信息,有两个参数:interface{}结构体和Error出错信息类。
- sealerConfig结构体
参数中含有能并行获取的数量限制以及一系列布尔型的变量,用于配置。 布尔型变量中包含了:AllowCommit 允许提交/AllowAddPiece 允许增加piece/AllowPreCommit1 允许预提交1/AllowPreCommit2 允许预提交2/AllowUnseal 允许解封扇区。
重要函数方法:
- func New(...) (*Manager, error)
New这个函数负责创建一个新的manager,执行过程大致为:
1.首先新建一个本地的存储点,如果创建失败则跳出函数返回错误信息
2.随后新建一个provider,如果创建失败则跳出函数返回错误信息
3.新建一个线上(远距离)的存储点
4.将新的manager信息放入参数m中,设置m的基本参数并运行
5.新建任务,并且设置任务的基本参数 也就是sealerConfig中的参数
6.增加工人,如果增加失败返回错误信息
7.新建manager完成
- addLocalStorage
这函数功能是增加当地存储空间,会报三种错误1.增加储存空间错误2.已经存在该存储空间错误3.存储空间配置错误
- AddWorker
增加worker,调用sched文件中的runWork函数。开始进行worker请求处理,如果在其他线程中存在解决方案则直接返回,否则开始处理(这个地方的请求是增加Worker 并且传递过去了Worker类型的参数
- ServerHTTP
提供远程的HTTP服务
- schedNop
空计划指令,什么都不进行操作
- schedFetch
获取计划,经理通过等待worker获得计划
- readPiece
从指定扇区中取出一部分数据,通过调用函数tryReadUnsealedPiece()来尝试读取扇区中已经被解封的数据
- tryReadUnsealedPiece
尝试读取没封装的数据
- AddPiece
在sector中新建文件存储片区,执行步骤:1.判锁2.查看是否有存在的片区,如果有则分配,如果没有则新建3.录入片区的一系列信息并返回
- SealPreCommit1 SealPreCommit2 SealCommit1 SealCommit2
执行上述这一系列操作
- FinalizeSector
这个模块就是将需要存储的信息传递给manager并且释放worker中扇区和缓存中无用的信息。新建选择其,并调用manager中的scheduler对象的方法,为worker分配资源。
- ReleaseUnsealed
释放没有被封装的(函数里也没有具体的实现)只有一句提示“即将执行释放操作”
- Remove
移除封装或者未封装的扇区,或者移除缓冲区
- 一系列Return函数
代码复用,返回是否执行成功的结果
该函数方法总结:
此函数定义了大量的接口函数,通过manager进行资源调度、存储分配、远程HTTP服务、读取解封之后的扇区中的数据操作
manager_calltracker.go
重要数据结构:
- WorkID
代表了工作的代号,参数有method表示worker所进行封装的封装任务的任务类型
- Params参数
用于表示JSON格式的参数
- WorkState
代表工作状态的结构体,包含了工作的编号WorkID,工作的阶段(已经开始、执行中、已完成)、storiface.CallID到WorkID的绑定容器、专门用于保存work的错误、worker的主机名以及Work开始时间
- type WorkStatus string
这是一个枚举类型,具体表示了三种工作状态
const (
wsStarted WorkStatus = "started" // task started, not scheduled/running on a worker yet
wsRunning WorkStatus = "running" // task running on a worker, waiting for worker return
wsDone WorkStatus = "done" // task returned from the worker, results available
)
重要函数方法:
- newWorkID
新建工作 会出现调度错误,最后返回WorkID对象
- setupWorkTracker
这个模块主要就是负责对正在执行的工作设置追踪器(只有处于状态wsRunning才会设置)
- getWork
获得工作状态,并执行cancel操作,将出现错误的程序或者没有被追踪的程序cancel掉。 1.新建一个workid 2.查看是否新建成功 3.查看workid是否存在,如果不存在返回错误 4.如果不存在则开始一个新的work,并且根据该work的状态执行操作 5.如果workid存在 说明程序已经运行或者已经被追踪,则返回true
- startWork
开始工作并设置追踪其
- waitWork
首先获取Work的状态,获取失败则报错,其次判断work是否处于started阶段 是则返回,最后判断work是否被正确追踪,错误则返回
- waitSimpleCall
调用waitCall 等待调用
- ReturnResult
返回结果信息
- Abort
允许中途结束work
模块3 Worker工作者模块
模块文件:worker_ calltracker.go worker_local.go worker_tracked.go
worker_calltracker.go
主要是描述call指令的几种状态:开始、完成、结果返回、未完成
worker_local.go
关键数据结构:
- WorkerConfig
此类中包含了工作种类和是否允许交换的标志
- worker_local
存储空间、本地存储、扇区标号、返回信息、执行操作、是否允许交换、指令追踪、已经接受的任务、运行、任务锁、阶段、是否可以工作、关闭
关键函数方法:
- newlocalworker
新建一个本地的工人
- AcquireSector
申请一块扇区
- rfunc
- asyncCall
异步调用 主要负责跟踪调用Call
- doReturn
尝试将操作结果返回给manager,如果操作成功就返回ture
- NewSector
新建扇区
- AddPiece
在扇区中新增存储块
- SealPreCommit1
- SealPreCommit2
- SealConmmit1
- SealConmmit2
- FinalizeSector
结束部分 删除不需要的信息 避免其占用内存
- ReleaseUnsealed
释放未封装的存储数据
- Remove
解封并清除扇区或者缓存中的信息
- MoveStorage
转移存储
- UnsealPiece
解封存储块,并且将没有用的信息删除
- Info
获取存储、cpu、gpu等信息
静态设置
- AddPiece ReturnType = "AddPiece"
- SealPreCommit1 ReturnType = "SealPreCommit1"
- SealPreCommit2 ReturnType = "SealPreCommit2"
- SealCommit1 ReturnType = "SealCommit1"
- SealCommit2 ReturnType = "SealCommit2"
- FinalizeSector ReturnType = "FinalizeSector"
- ReleaseUnsealed ReturnType = "ReleaseUnsealed"
- MoveStorage ReturnType = "MoveStorage"
- UnsealPiece ReturnType = "UnsealPiece"
- ReadPiece ReturnType = "ReadPiece"
- Fetch ReturnType = "Fetch"
模块4 Seletor 选择器模块
模块关键包含:selector_alloc.go selector_existing.go selector_task.go
selector_alloc.go
重要函数方法:
- newAllocSelector
新建一个分配选择器
- OK
分配选择器初始化:1.获得工人的任务类型2.获得工人的路径3.获取扇区的大小4.寻找最适合的存储空间
- cmp
compare,比较两个工人的对资源的利用效率,如果a < b返回ture 反之返回false
selector_existing.go
- newExistingSelector
新建一个存在选择器
- OK
存在选择器初始化:1.获得工人的任务类型2.获得工人的路径3.获取扇区的大小4.寻找最适合的存储空间
- cmp
compare,比较两个工人的对资源的利用效率,如果a< b返回ture 反之返回false
selector_task.
- newTaskSelector
新建一个任务选择器
- OK
任务选择器初始化:获取支持工人的工作类型
- cmp
compare,比较。分别获取ab两个工人的工作类型,选取达到目标做的较少的工人
杂项 零散的模块
Sector-Storage Scheduler模块层次包含情况
scheduler:
map[WorkerID]*workerHandle
chan *workerRequest
chan *schedWindowRequest
WorkerID
schedWindow
activeResources
[]*workerRequest
chan workerDisableReq
[]*schedWindow
WorkerID
*requestQueue
[]*workerRequest
[]*schedWindowRequest
schedWindow
chan *schedWindow
*workTracker
sync.Mutex
map[storiface.CallID]struct{}
abi.SectorID
uuid.UUID
map[storiface.CallID]trackedWork
trackedWork
storiface.WorkerJob
CallID
abi.SectorID
sealtasks.TaskType
运行等待时间
开始时间
hostname
WorkerID
string
trackedWorker
WorkerID
storiface.WorkerInfo
*workTracker
workerHandle:
Worker
WorkerInfo
activeResources
[]*schedWindow
schedWindow:
activeResources
[]*workerRequest
workerRequest
WorkerAction
sealtasks.TaskType
storage.SectorRef
WorkerSelector
chan<- workerResponse
调度情况模块1 scheduler模块
- 资源分配路径: runSchend() -> trySched() [schedWindow] runSched()工作过程: 进行资源分配的起始函数,分配器的成员函数,监听分配器中各个通道的变化并作出相对应的反应。 第一阶段进入select监听状态,如果有请求加入则把请求加入请求队列中,如果有请求窗口进入则把窗口分配到已打开窗口的数组当中。当调度器关闭的时候则退出第一状态。 第二状态是在在初始化和调度器开始进行分配动作[比如workerChange workerDisable等情况发生]之后开始,同样是进入循环监听状态如果此时scheduler没有任何改变则退出第二监听状态。 之后对数据进行处理,对请求窗口等信息保存的对应的容器中。然后调用trySched()函数正式进行资源分配
trySched()工作过程: 尝试进行资源分配的核心过程函数。 任务分配基于优先级、Worker工人资源可用性、特定任务Worker的性能、窗口请求年龄
- 为每个调度队列中的任务找到能处理他们的可容纳窗口 1.1 创建解决任务的窗口容量的列表 acceptableWindws slice 1.2 根据任务选择器的性能为窗口排序
- 再次遍历调度队列 把任务分配给第一个有可用资源的合适的窗口
- 把被调度后的窗口提交给Worker
- 资源调度关闭路径: schedClose() -> workerCleanup() scheduler调度器管理workers,schedClose()函数遍历所有的worker对每个worker执行对象的方法workerCleanup()来清空worker占用的资源并将其从数组中删除实现关闭。
- activeSource 资源分配的实现路径: withResources() -> [add free等接口函数](获得简单信息的函数)
调度情况模块2 worker模块
func (wh *workerHandle) utilization() ->func (a *activeResources) utilization workHandle结构体调用活跃资源的方法来获得当前资源的占用情况
- worker资源调度路径:
func (sh *scheduler) runWorker() -> func (sw *schedWorker) go sw.handleWorker() 工作过程:->func (sw *schedWorker) workerCompactWindows() ->func (sw *schedWorker) processAssignedWindows()
runWorker()该函数运行一个Worker处理其的请求,一个worker对应一个handleWorker的结构体,如果已经存在结构体 也就是在其他线程中解决此worker的情况 直接返回map[workerID]*handleWorker中的handleWorker的结构体指针,如果不存在,则需要配置调度矿工的信息,执行处理矿工的函数。 在存储好schedWorker worker资源调度清单之后 执行go sw.handleWorker(),开启一个goroutinue在后台并行执行。 handleWorker是一个专门处理worker调度的方法,他是处理矿工需求的实现函数。 读取context中的内容开始。开始进行对worker请求的处理。如果worker调度器没有更多的窗口来应对请求,则结束。等待更多的窗口到来或者是等待阻塞中的任务结束。如果有新窗口的到来,则退出循环向窗口发送请求信息;如果没有新窗口的到来则等待更多的任务被主调度器分配或者等待worker结束一项任务并空闲下来。空闲之后的worker要对其分配资源,绑定worker和窗口以及分配进程任务给窗口
- worker_tracked worker跟踪部分路径:
SealPreCommit1() SealPreCommit2() SealCommit1()...等函数调用track方法 ->func (wt *workTracker) track -> 匿名函数
调度情况模块3 Manager模块
调度情况模块4 Selector模块
selector模块没有特别的调度线路,仅仅提供了三种选择器的函数方法以供调用。
原文地址:Lotus | Filecoin | Sector-storage部分源码与模块理解 - 知乎 (zhihu.com)
标签:Sector,函数,Lotus,worker,调度,源码,模块,func,scheduler 来源: https://www.cnblogs.com/python-learn/p/15913152.html