其他分享
首页 > 其他分享> > 彻底搞懂kubernetes调度框架与插件

彻底搞懂kubernetes调度框架与插件

作者:互联网

调度框架 [1]

本文基于 kubernetes 1.24 进行分析

调度框架(Scheduling Framework)是Kubernetes 的调度器 kube-scheduler 设计的的可插拔架构,将插件(调度算法)嵌入到调度上下文的每个扩展点中,并编译为 kube-scheduler

kube-scheduler 1.22 之后,在 pkg/scheduler/framework/interface.go 中定义了一个 Plugininterface,这个 interface 作为了所有插件的父级。而每个未调度的 Pod,Kubernetes 调度器会根据一组规则尝试在集群中寻找一个节点。

type Plugin interface {
	Name() string
}

下面会对每个算法是如何实现的进行分析

在初始化 scheduler 时,会创建一个 profile,profile是关于 scheduler 调度配置相关的定义

func New(client clientset.Interface,
...
	profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
		frameworkruntime.WithClientSet(client),
		frameworkruntime.WithKubeConfig(options.kubeConfig),
		frameworkruntime.WithInformerFactory(informerFactory),
		frameworkruntime.WithSnapshotSharedLister(snapshot),
		frameworkruntime.WithPodNominator(nominator),
		frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
		frameworkruntime.WithClusterEventMap(clusterEventMap),
		frameworkruntime.WithParallelism(int(options.parallelism)),
		frameworkruntime.WithExtenders(extenders),
	)
	if err != nil {
		return nil, fmt.Errorf("initializing profiles: %v", err)
	}

	if len(profiles) == 0 {
		return nil, errors.New("at least one profile is required")
	}
....
}

关于 profile 的实现,则为 KubeSchedulerProfile,也是作为 yaml生成时传入的配置

// KubeSchedulerProfile 是一个 scheduling profile.
type KubeSchedulerProfile struct {
    // SchedulerName 是与此配置文件关联的调度程序的名称。
    // 如果 SchedulerName 与 pod “spec.schedulerName”匹配,则使用此配置文件调度 pod。
	SchedulerName string

    // Plugins指定应该启用或禁用的插件集。
    // 启用的插件是除了默认插件之外应该启用的插件。禁用插件应是禁用的任何默认插件。
    // 当没有为扩展点指定启用或禁用插件时,将使用该扩展点的默认插件(如果有)。
    // 如果指定了 QueueSort 插件,
    // 则必须为所有配置文件指定相同的 QueueSort Plugin 和 PluginConfig。
    // 这个Plugins展现的形式则是调度上下文中的所有扩展点(这是抽象),实际中会表现为多个扩展点
	Plugins *Plugins

	// PluginConfig 是每个插件的一组可选的自定义插件参数。
    // 如果省略PluginConfig参数等同于使用该插件的默认配置。
	PluginConfig []PluginConfig
}

对于 profile.NewMap 就是根据给定的配置来构建这个framework,因为配置可能是存在多个的。而 Registry 则是所有可用插件的集合,内部构造则是 PluginFactory ,通过函数来构建出对应的 plugin

func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
	stopCh <-chan struct{}, opts ...frameworkruntime.Option) (Map, error) {
	m := make(Map)
	v := cfgValidator{m: m}

	for _, cfg := range cfgs {
		p, err := newProfile(cfg, r, recorderFact, stopCh, opts...)
		if err != nil {
			return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
		}
		if err := v.validate(cfg, p); err != nil {
			return nil, err
		}
		m[cfg.SchedulerName] = p
	}
	return m, nil
}

// newProfile 给的配置构建出一个profile
func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
	stopCh <-chan struct{}, opts ...frameworkruntime.Option) (framework.Framework, error) {
	recorder := recorderFact(cfg.SchedulerName)
	opts = append(opts, frameworkruntime.WithEventRecorder(recorder))
	return frameworkruntime.NewFramework(r, &cfg, stopCh, opts...)
}

可以看到最终返回的是一个 Framework 。那么来看下这个 Framework

Framework 是一个抽象,管理着调度过程中所使用的所有插件,并在调度上下文中适当的位置去运行对应的插件

type Framework interface {
	Handle
	// QueueSortFunc 返回对调度队列中的 Pod 进行排序的函数
    // 也就是less,在Sort打分阶段的打分函数
	QueueSortFunc() LessFunc
    
    // RunPreFilterPlugins 运行配置的一组PreFilter插件。
    // 如果这组插件中,任何一个插件失败,则返回 *Status 并设置为non-success。
    // 如果返回状态为non-success,则调度周期中止。
    // 它还返回一个 PreFilterResult,它可能会影响到要评估下游的节点。
    
	RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)

    // RunPostFilterPlugins 运行配置的一组PostFilter插件。 
    // PostFilter 插件是通知性插件,在这种情况下应配置为先执行并返回 Unschedulable 状态,
    // 或者尝试更改集群状态以使 pod 在未来的调度周期中可能会被调度。
	RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)

    // RunPreBindPlugins 运行配置的一组 PreBind 插件。
    // 如果任何一个插件返回错误,则返回 *Status 并且code设置为non-success。
    // 如果code为“Unschedulable”,则调度检查失败,
    // 则认为是内部错误。在任何一种情况下,Pod都不会被bound。
	RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

    // RunPostBindPlugins 运行配置的一组PostBind插件
	RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)

    // RunReservePluginsReserve运行配置的一组Reserve插件的Reserve方法。
    // 如果在这组调用中的任何一个插件返回错误,则不会继续运行剩余调用的插件并返回错误。
    // 在这种情况下,pod将不能被调度。
	RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

    // RunReservePluginsUnreserve运行配置的一组Reserve插件的Unreserve方法。
	RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)

    // RunPermitPlugins运行配置的一组Permit插件。
    // 如果这些插件中的任何一个返回“Success”或“Wait”之外的状态,则它不会继续运行其余插件并返回错误。
    // 否则,如果任何插件返回 “Wait”,则此函数将创建等待pod并将其添加到当前等待pod的map中,
    // 并使用“Wait” code返回状态。 Pod将在Permit插件返回的最短持续时间内保持等待pod。
	RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

    // 如果pod是waiting pod,WaitOnPermit 将阻塞,直到等待的pod被允许或拒绝。
	WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status

    // RunBindPlugins运行配置的一组bind插件。 Bind插件可以选择是否处理Pod。
    // 如果 Bind 插件选择跳过binding,它应该返回 code=5("skip")状态。
    // 否则,它应该返回“Error”或“Success”。
    // 如果没有插件处理绑定,则RunBindPlugins返回code=5("skip")的状态。
	RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

	// 如果至少定义了一个filter插件,则HasFilterPlugins返回true
	HasFilterPlugins() bool

    // 如果至少定义了一个PostFilter插件,则HasPostFilterPlugins返回 true。
	HasPostFilterPlugins() bool

	// 如果至少定义了一个Score插件,则HasScorePlugins返回 true。
	HasScorePlugins() bool

    // ListPlugins将返回map。key为扩展点名称,value则是配置的插件列表。
	ListPlugins() *config.Plugins

    // ProfileName则是与profile name关联的framework
	ProfileName() string
}

而实现这个抽象的则是 frameworkImplframeworkImpl 是初始化与运行 scheduler plugins 的组件,并在调度上下文中会运行这些扩展点

type frameworkImpl struct {
   registry             Registry
   snapshotSharedLister framework.SharedLister
   waitingPods          *waitingPodsMap
   scorePluginWeight    map[string]int
   queueSortPlugins     []framework.QueueSortPlugin
   preFilterPlugins     []framework.PreFilterPlugin
   filterPlugins        []framework.FilterPlugin
   postFilterPlugins    []framework.PostFilterPlugin
   preScorePlugins      []framework.PreScorePlugin
   scorePlugins         []framework.ScorePlugin
   reservePlugins       []framework.ReservePlugin
   preBindPlugins       []framework.PreBindPlugin
   bindPlugins          []framework.BindPlugin
   postBindPlugins      []framework.PostBindPlugin
   permitPlugins        []framework.PermitPlugin

   clientSet       clientset.Interface
   kubeConfig      *restclient.Config
   eventRecorder   events.EventRecorder
   informerFactory informers.SharedInformerFactory

   metricsRecorder *metricsRecorder
   profileName     string

   extenders []framework.Extender
   framework.PodNominator

   parallelizer parallelize.Parallelizer
}

那么来看下 Registry ,Registry 是作为一个可用插件的集合。framework 使用 registry 来启用和对插件配置的初始化。在初始化框架之前,所有插件都必须在注册表中。表现形式就是一个 map[]key 是插件的名称,value是 PluginFactory

type Registry map[string]PluginFactory

而在 pkg\scheduler\framework\plugins\registry.go 中会将所有的 in-tree plugin 注册进来。通过 NewInTreeRegistry 。后续如果还有插件要注册,可以通过 WithFrameworkOutOfTreeRegistry 来注册其他的插件。

func NewInTreeRegistry() runtime.Registry {
	fts := plfeature.Features{
		EnableReadWriteOncePod:                       feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
		EnableVolumeCapacityPriority:                 feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
		EnableMinDomainsInPodTopologySpread:          feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread),
		EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
	}

	return runtime.Registry{
		selectorspread.Name:                  selectorspread.New,
		imagelocality.Name:                   imagelocality.New,
		tainttoleration.Name:                 tainttoleration.New,
		nodename.Name:                        nodename.New,
		nodeports.Name:                       nodeports.New,
		nodeaffinity.Name:                    nodeaffinity.New,
		podtopologyspread.Name:               runtime.FactoryAdapter(fts, podtopologyspread.New),
		nodeunschedulable.Name:               nodeunschedulable.New,
		noderesources.Name:                   runtime.FactoryAdapter(fts, noderesources.NewFit),
		noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation),
		volumebinding.Name:                   runtime.FactoryAdapter(fts, volumebinding.New),
		volumerestrictions.Name:              runtime.FactoryAdapter(fts, volumerestrictions.New),
		volumezone.Name:                      volumezone.New,
		nodevolumelimits.CSIName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
		nodevolumelimits.EBSName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
		nodevolumelimits.GCEPDName:           runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
		nodevolumelimits.AzureDiskName:       runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
		nodevolumelimits.CinderName:          runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
		interpodaffinity.Name:                interpodaffinity.New,
		queuesort.Name:                       queuesort.New,
		defaultbinder.Name:                   defaultbinder.New,
		defaultpreemption.Name:               runtime.FactoryAdapter(fts, defaultpreemption.New),
	}
}

这里插入一个题外话,关于 in-tree plugin

在这里没有找到关于,kube-scheduler ,只是找到有关的概念,大概可以解释为,in-tree表示为随kubernetes官方提供的二进制构建的 plugin 则为 in-tree,而独立于kubernetes代码库之外的为 out-of-tree [3] 。这种情况下,可以理解为,AA则是 out-of-treePod, DeplymentSet 等是 in-tree

接下来回到初始化 scheduler ,在初始化一个 scheduler 时,会通过NewInTreeRegistry 来初始化

func New(client clientset.Interface,
	....
	registry := frameworkplugins.NewInTreeRegistry()
	if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
		return nil, err
	}
         
	...

	profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
		frameworkruntime.WithClientSet(client),
		frameworkruntime.WithKubeConfig(options.kubeConfig),
		frameworkruntime.WithInformerFactory(informerFactory),
		frameworkruntime.WithSnapshotSharedLister(snapshot),
		frameworkruntime.WithPodNominator(nominator),
		frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
		frameworkruntime.WithClusterEventMap(clusterEventMap),
		frameworkruntime.WithParallelism(int(options.parallelism)),
		frameworkruntime.WithExtenders(extenders),
	)
	...
}

接下来在调度上下文 scheduleOneschedulePod 时,会通过 framework 调用对应的插件来处理这个扩展点工作。具体的体现在,pkg\scheduler\schedule_one.go 中的预选阶段

func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
	defer trace.LogIfLong(100 * time.Millisecond)

	if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
		return result, err
	}
	trace.Step("Snapshotting scheduler cache and node infos done")

	if sched.nodeInfoSnapshot.NumNodes() == 0 {
		return result, ErrNoNodesAvailable
	}

	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
	if err != nil {
		return result, err
	}
	trace.Step("Computing predicates done")

与其他扩展点部分,在调度上下文 scheduleOne 中可以很好的看出,功能都是 framework 提供的。

func (sched *Scheduler) scheduleOne(ctx context.Context) {

    ...
    
	scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)

    ...
    
	// Run the Reserve method of reserve plugins.
	if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
	}

    ...
    
	// Run "permit" plugins.
	runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
	
		// One of the plugins returned status different than success or wait.
		fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

...
    
	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
	go func() {
		...
		waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
		if !waitOnPermitStatus.IsSuccess() {
			...
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		}

		// Run "prebind" plugins.
		preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		
        ...
        
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
	
        ...

		...
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
			
        ...

		// Run "postbind" plugins.
		fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

	...
}

插件 [4]

插件(Plugins)(也可以算是调度策略)在 kube-scheduler 中的实现为 framework plugin,插件API的实现分为两个步骤:registerconfigured,然后都实现了其父方法 Plugin。然后可以通过配置(kube-scheduler --config 提供)启动或禁用插件;除了默认插件外,还可以实现自定义调度插件与默认插件进行绑定。

type Plugin interface {
    Name() string
}
// sort扩展点
type QueueSortPlugin interface {
    Plugin
    Less(*v1.pod, *v1.pod) bool
}
// PreFilter扩展点
type PreFilterPlugin interface {
    Plugin
    PreFilter(context.Context, *framework.CycleState, *v1.pod) error
}

插件的载入过程

scheduler 被启动时,会 scheduler.New(cc.Client.. 这个时候会传入 profiles,整个的流如下:

NewScheduler

我们了解如何 New 一个 scheduler 即为 Setup 中去配置这些参数,

func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {

    ...
    
	// Create the scheduler.
	sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		cc.DynInformerFactory,
		recorderFactory,
		ctx.Done(),
		scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
		scheduler.WithKubeConfig(cc.KubeConfig),
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
		scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
		scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
		scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
			// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
			completedProfiles = append(completedProfiles, profile)
		}),
	)
    ...
}

profile.NewMap

scheduler.New 中,会根据配置生成profile,而 profile.NewMap 会完成这一步

func New(client clientset.Interface,
	...
         
	clusterEventMap := make(map[framework.ClusterEvent]sets.String)

	profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
		frameworkruntime.WithClientSet(client),
		frameworkruntime.WithKubeConfig(options.kubeConfig),
		frameworkruntime.WithInformerFactory(informerFactory),
		frameworkruntime.WithSnapshotSharedLister(snapshot),
		frameworkruntime.WithPodNominator(nominator),
		frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
		frameworkruntime.WithClusterEventMap(clusterEventMap),
		frameworkruntime.WithParallelism(int(options.parallelism)),
		frameworkruntime.WithExtenders(extenders),
	)

         ...
}

NewFramework

newProfile 返回的则是一个创建好的 framework

func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
	stopCh <-chan struct{}, opts ...frameworkruntime.Option) (framework.Framework, error) {
	recorder := recorderFact(cfg.SchedulerName)
	opts = append(opts, frameworkruntime.WithEventRecorder(recorder))
	return frameworkruntime.NewFramework(r, &cfg, stopCh, opts...)
}

最终会走到 pluginsNeeded,这里会根据配置中开启的插件而返回一个插件集,这个就是最终在每个扩展点中药执行的插件。

func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.String {
	pgSet := sets.String{}

	if plugins == nil {
		return pgSet
	}

	find := func(pgs *config.PluginSet) {
		for _, pg := range pgs.Enabled {
			pgSet.Insert(pg.Name)
		}
	}
	// 获取到所有的扩展点,找到为Enabled的插件加入到pgSet
	for _, e := range f.getExtensionPoints(plugins) {
		find(e.plugins)
	}
	// Parse MultiPoint separately since they are not returned by f.getExtensionPoints()
	find(&plugins.MultiPoint)

	return pgSet
}

插件的执行

在对插件源码部分分析,会找几个典型的插件进行分析,而不会对全部的进行分析,因为总的来说是大同小异,分析的插件有 NodePortsNodeResourcesFitpodtopologyspread

NodePorts

这里以一个简单的插件来分析;NodePorts 插件用于检查Pod请求的端口,在节点上是否为空闲端口。

NodePorts 实现了 FilterPluginPreFilterPlugin

PreFilter 将会被 frameworkPreFilter 扩展点被调用。

func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
	s := getContainerPorts(pod) // 或得Pod得端口
    // 写入状态
	cycleState.Write(preFilterStateKey, preFilterState(s))
	return nil, nil
}

Filter 将会被 frameworkFilter 扩展点被调用。

// Filter invoked at the filter extension point.
func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
   wantPorts, err := getPreFilterState(cycleState)
   if err != nil {
      return framework.AsStatus(err)
   }

   fits := fitsPorts(wantPorts, nodeInfo)
   if !fits {
      return framework.NewStatus(framework.Unschedulable, ErrReason)
   }

   return nil
}

func fitsPorts(wantPorts []*v1.ContainerPort, nodeInfo *framework.NodeInfo) bool {
	// 对比existingPorts 和 wantPorts是否冲突,冲突则调度失败
	existingPorts := nodeInfo.UsedPorts
	for _, cp := range wantPorts {
		if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) {
			return false
		}
	}
	return true
}

New ,初始化新插件,在 register 中注册得

func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
	return &NodePorts{}, nil
}

在调用中,如果有任何一个插件返回错误,则跳过该扩展点注册得其他插件,返回失败。

func (f *frameworkImpl) RunFilterPlugins(
	ctx context.Context,
	state *framework.CycleState,
	pod *v1.Pod,
	nodeInfo *framework.NodeInfo,
) framework.PluginToStatus {
	statuses := make(framework.PluginToStatus)
	for _, pl := range f.filterPlugins {
		pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
		if !pluginStatus.IsSuccess() {
			if !pluginStatus.IsUnschedulable() 
				errStatus := framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), pluginStatus.AsError())).WithFailedPlugin(pl.Name())
				return map[string]*framework.Status{pl.Name(): errStatus}
			}
			pluginStatus.SetFailedPlugin(pl.Name())
			statuses[pl.Name()] = pluginStatus
		}
	}

	return statuses
}

返回得状态是一个 Status 结构体,该结构体表示了插件运行的结果。由 Codereasons、(可选)errfailedPlugin (失败的那个插件名)组成。当 code 不是 Success 时,应说明原因。而且,当 codeSuccess 时,其他所有字段都应为空。nil 状态也被视为成功。

type Status struct {
	code    Code
	reasons []string
	err     error
	// failedPlugin is an optional field that records the plugin name a Pod failed by.
	// It's set by the framework when code is Error, Unschedulable or UnschedulableAndUnresolvable.
	failedPlugin string
}

NodeResourcesFit [5]

NodeResourcesFit 扩展检查节点是否拥有 Pod 请求的所有资源。分数可以使用以下三种策略之一,扩展点为:preFilterfilterscore

Fit

NodeResourcesFit PreFilter 可以看到调用得 computePodResourceRequest

// PreFilter invoked at the prefilter extension point.
func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
   cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))
   return nil, nil
}

computePodResourceRequest 这里有一个注释,总体解释起来是这样得:computePodResourceRequest ,返回值( framework.Resource)覆盖了每一个维度中资源的最大宽度。因为将按照 init-containers , containers 得顺序运行,会通过迭代方式收集每个维度中的最大值。计算时会对常规容器的资源向量求和,因为containers 运行会同时运行多个容器。计算示例为:

Pod:
  InitContainers
    IC1:
      CPU: 2
      Memory: 1G
    IC2:
      CPU: 2
      Memory: 3G
  Containers
    C1:
      CPU: 2
      Memory: 1G
    C2:
      CPU: 1
      Memory: 1G

在维度1中(InitContainers)所需资源最大值时,CPU=2, Memory=3G;而维度2(Containers)所需资源最大值为:CPU=2, Memory=1G;那么最终结果为 CPU=3, Memory=3G,因为在维度1,最大资源时Memory=3G;而维度2最大资源是CPU=1+2, Memory=1+1,取每个维度中最大资源最大宽度即为 CPU=3, Memory=3G。

下面则看下代码得实现

func computePodResourceRequest(pod *v1.Pod) *preFilterState {
	result := &preFilterState{}
	for _, container := range pod.Spec.Containers {
		result.Add(container.Resources.Requests)
	}

	// 取最大得资源
	for _, container := range pod.Spec.InitContainers {
		result.SetMaxResource(container.Resources.Requests)
	}

	// 如果Overhead正在使用,需要将其计算到总资源中
	if pod.Spec.Overhead != nil {
		result.Add(pod.Spec.Overhead)
	}
	return result
}

// SetMaxResource 是比较ResourceList并为每个资源取最大值。
func (r *Resource) SetMaxResource(rl v1.ResourceList) {
	if r == nil {
		return
	}

	for rName, rQuantity := range rl {
		switch rName {
		case v1.ResourceMemory:
			r.Memory = max(r.Memory, rQuantity.Value())
		case v1.ResourceCPU:
			r.MilliCPU = max(r.MilliCPU, rQuantity.MilliValue())
		case v1.ResourceEphemeralStorage:
			if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
				r.EphemeralStorage = max(r.EphemeralStorage, rQuantity.Value())
			}
		default:
			if schedutil.IsScalarResourceName(rName) {
				r.SetScalar(rName, max(r.ScalarResources[rName], rQuantity.Value()))
			}
		}
	}
}

leastAllocate

LeastAllocated 是 NodeResourcesFit 的打分策略 ,LeastAllocated 打分的标准是更偏向于请求资源较少的Node。将会先计算出Node上调度的pod请求的内存、CPU与其他资源的百分比,然后并根据请求的比例与容量的平均值的最小值进行优先级排序。

计算公式是这样的:\(\frac{\frac{cpu((capacity-requested) \times MaxNodeScore \times cpuWeight)}{capacity} + \frac{memory((capacity-requested) \times MaxNodeScore \times memoryWeight}{capacity}) + ...}{weightSum}\)

下面来看下实现

func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap) int64 {
	return func(requested, allocable resourceToValueMap) int64 {
		var nodeScore, weightSum int64
		for resource := range requested {
			weight := resToWeightMap[resource]
            //  计算出的资源分数乘weight
			resourceScore := leastRequestedScore(requested[resource], allocable[resource])
			nodeScore += resourceScore * weight
			weightSum += weight
		}
		if weightSum == 0 {
			return 0
		}
        // 最终除weightSum
		return nodeScore / weightSum
	}
}

leastRequestedScore 计算标准为未使用容量的计算范围为 0~MaxNodeScore,0 为最低优先级,MaxNodeScore 为最高优先级。未使用的资源越多,得分越高。

func leastRequestedScore(requested, capacity int64) int64 {
	if capacity == 0 {
		return 0
	}
	if requested > capacity {
		return 0
	}
	// 容量 - 请求的 x 预期值(100)/ 容量
	return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity
}

Topology [6]

Concept

在对 podtopologyspread 插件进行分析前,先需要掌握Pod拓扑的概念。

Pod拓扑(Pod Topology)是Kubernetes Pod调度机制,可以将Pod分布在集群中不同 Zone ,以及用户自定义的各种拓扑域 (topology domains)。当有了拓扑域后,用户可以更高效的利用集群资源。

如何来解释拓扑域,首先需要提及为什么需要拓扑域,在集群有3个节点,并且当Pod副本数为2时,又不希望两个Pod在同一个Node上运行。在随着扩大Pod的规模,副本数扩展到到15个时,这时候最理想的方式是每个Node运行5个Pod,在这种背景下,用户希望对集群中Zone的安排为相似的副本数量,并且在集群存在部分问题时可以更好的自愈(也是按照相似的副本数量均匀的分布在Node上)。在这种情况下Kubernetes 提供了Pod 拓扑约束来解决这个问题。

定义一个Topology

apiVersion: v1
kind: Pod
metadata:
  name: example-pod
spec:
  # Configure a topology spread constraint
  topologySpreadConstraints:
    - maxSkew: <integer> # 
      minDomains: <integer> # optional; alpha since v1.24
      topologyKey: <string>
      whenUnsatisfiable: <string>
      labelSelector: <object>

参数的描述

对于拓扑域的理解

对于拓扑域,官方是这么说明的,假设有一个带有以下lable的 4 节点集群:

NAME    STATUS   ROLES    AGE     VERSION   LABELS
node1   Ready    <none>   4m26s   v1.16.0   node=node1,zone=zoneA
node2   Ready    <none>   3m58s   v1.16.0   node=node2,zone=zoneA
node3   Ready    <none>   3m17s   v1.16.0   node=node3,zone=zoneB
node4   Ready    <none>   2m43s   v1.16.0   node=node4,zone=zoneB

那么集群拓扑如图:

image

图1:集群拓扑图
Source:https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

假设一个 4 节点集群,其中 3个label被标记为foo: bar的 Pod 分别位于Node1、Node2 和 Node3:

image

图2:集群拓扑图
Source:https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

这种情况下,新部署一个Pod,并希望新Pod与现有Pod跨 Zone均匀分布,资源清单文件如下:

kind: Pod
apiVersion: v1
metadata:
  name: mypod
  labels:
    foo: bar
spec:
  topologySpreadConstraints:
  - maxSkew: 1
    topologyKey: zone
    whenUnsatisfiable: DoNotSchedule
    labelSelector:
      matchLabels:
        foo: bar
  containers:
  - name: pause
    image: k8s.gcr.io/pause:3.1

这个清单对于拓扑域来说,topologyKey: zone 表示对Pod均匀分布仅应用于已标记的节点(如 foo: bar),将会跳过没有标签的节点(如zone: <any value>)。如果 scheduler 找不到满足约束的方法,whenUnsatisfiable: DoNotSchedule 设置的策略则是 scheduler 对新部署的Pod保持 Pendding

如果此时 scheduler 将新Pod 调度至 \(Zone_A\),此时Pod分布在拓扑域间为 \([3,1]\) ,而 maxSkew 配置的值是1。此时倾斜值为 \(Zone_A - Zone_B = 3-1=2\),不满足 maxSkew=1,故这个Pod只能被调度到 \(Zone_B\)。

此时Pod调度拓扑图为图3或图4

image

图3:集群拓扑图
Source:https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

image

图4:集群拓扑图
Source:https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

如果需要将Pod调度到 \(Zone_A\) ,可以按照如下方式进行:

下面再通过一个例子增强对拓扑域了解

多拓扑约束

设拥有一个 4 节点集群,其中 3 个现有 Pod 标记 foo: bar 分别位于 node1node2node3

image

图5:集群拓扑图
Source:https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/

部署的资源清单如下:可以看出拓扑分布约束配置了多个

kind: Pod
apiVersion: v1
metadata:
  name: mypod
  labels:
    foo: bar
spec:
  topologySpreadConstraints:
  - maxSkew: 1
    topologyKey: zone
    whenUnsatisfiable: DoNotSchedule
    labelSelector:
      matchLabels:
        foo: bar
  - maxSkew: 1
    topologyKey: node
    whenUnsatisfiable: DoNotSchedule
    labelSelector:
      matchLabels:
        foo: bar
  containers:
  - name: pause
    image: k8s.gcr.io/pause:3.1

在这种情况下,为了匹配第一个约束条件,新Pod 只能放置在 \(Zone_B\) ;而就第二个约束条件,新Pod只能调度到 node4。在这种配置多约束条件下, scheduler 只考虑满足所有约束的值,因此唯一有效的是 node4

如何为集群设置一个默认拓扑域约束

默认情况下,拓扑域约束也作 scheduler 的为 scheduler configurtion 中的一部分参数,这也意味着,可以通过profile为整个集群级别指定一个默认的拓扑域调度约束,

apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration

profiles:
  - schedulerName: default-scheduler
    pluginConfig:
      - name: PodTopologySpread
        args:
          defaultConstraints:
            - maxSkew: 1
              topologyKey: topology.kubernetes.io/zone
              whenUnsatisfiable: ScheduleAnyway
          defaultingType: List

默认约束策略

如果在没有配置集群级别的约束策略时,kube-scheduler 内部 topologyspread 插件提供了一个默认的拓扑约束策略,大致上如下列清单所示

defaultConstraints:
  - maxSkew: 3
    topologyKey: "kubernetes.io/hostname"
    whenUnsatisfiable: ScheduleAnyway
  - maxSkew: 5
    topologyKey: "topology.kubernetes.io/zone"
    whenUnsatisfiable: ScheduleAnyway

上述清单中内容可以在 pkg\scheduler\framework\plugins\podtopologyspread\plugin.go

var systemDefaultConstraints = []v1.TopologySpreadConstraint{
	{
		TopologyKey:       v1.LabelHostname,
		WhenUnsatisfiable: v1.ScheduleAnyway,
		MaxSkew:           3,
	},
	{
		TopologyKey:       v1.LabelTopologyZone,
		WhenUnsatisfiable: v1.ScheduleAnyway,
		MaxSkew:           5,
	},
}

可以通过在配置文件中留空,来禁用默认配置

apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration

profiles:
  - schedulerName: default-scheduler
    pluginConfig:
      - name: PodTopologySpread
        args:
          defaultConstraints: []
          defaultingType: List

通过源码学习Topology

podtopologyspread 实现了4种扩展点方法,包含 filterscore

PreFilter

可以看到 PreFilter 的核心为 calPreFilterState

func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
	s, err := pl.calPreFilterState(ctx, pod)
	if err != nil {
		return nil, framework.AsStatus(err)
	}
	cycleState.Write(preFilterStateKey, s)
	return nil, nil
}

calPreFilterState 主要功能是用在计算如何在拓扑域中分布Pod,首先看段代码时,需要掌握下属几个概念

func (pl *PodTopologySpread) calPreFilterState(ctx context.Context, pod *v1.Pod) (*preFilterState, error) {
    // 获取Node
	allNodes, err := pl.sharedLister.NodeInfos().List()
	if err != nil {
		return nil, fmt.Errorf("listing NodeInfos: %w", err)
	}
	var constraints []topologySpreadConstraint
	if len(pod.Spec.TopologySpreadConstraints) > 0 {
		// 这里会构建出TopologySpreadConstraints,因为约束是不确定的
		constraints, err = filterTopologySpreadConstraints(
			pod.Spec.TopologySpreadConstraints,
			v1.DoNotSchedule,
			pl.enableMinDomainsInPodTopologySpread,
			pl.enableNodeInclusionPolicyInPodTopologySpread,
		)
		if err != nil {
			return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %w", err)
		}
	} else {
        // buildDefaultConstraints使用".DefaultConstraints"与pod匹配的
        // service、replication controllers、replica sets 
        // 和stateful sets的选择器为pod构建一个约束。
		constraints, err = pl.buildDefaultConstraints(pod, v1.DoNotSchedule)
		if err != nil {
			return nil, fmt.Errorf("setting default hard topology spread constraints: %w", err)
		}
	}
	if len(constraints) == 0 { // 如果是空的,则返回空preFilterState
		return &preFilterState{}, nil
	}
    // 初始化一个 preFilterState 状态
	s := preFilterState{
		Constraints:          constraints,
		TpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
		TpPairToMatchNum:     make(map[topologyPair]int, sizeHeuristic(len(allNodes), constraints)),
	}
	// 根据node统计拓扑域数量
	tpCountsByNode := make([]map[topologyPair]int, len(allNodes))
	// 获取pod亲和度配置
	requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
	processNode := func(i int) {
		nodeInfo := allNodes[i]
		node := nodeInfo.Node()
		if node == nil {
			klog.ErrorS(nil, "Node not found")
			return
		}
		// 通过spreading去过滤node以用作filters,错误解析以向后兼容
		if !pl.enableNodeInclusionPolicyInPodTopologySpread {
			if match, _ := requiredNodeAffinity.Match(node); !match {
				return
			}
		}

		// 确保node的lable 包含topologyKeys定义的值
		if !nodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
			return
		}

		tpCounts := make(map[topologyPair]int, len(constraints))
		for _, c := range constraints { // 对应的约束列表
			if pl.enableNodeInclusionPolicyInPodTopologySpread &&
				!c.matchNodeInclusionPolicies(pod, node, requiredNodeAffinity) {
				continue
			}
			// 构建出 topologyPair 以key value形式,
			// 通常情况下TopologyKey属于什么类型的拓扑
			//  node.Labels[c.TopologyKey] 则是属于这个拓扑中那个子域
			pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
			// 计算与标签选择器相匹配的pod有多少个
			count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
			tpCounts[pair] = count
		}
		tpCountsByNode[i] = tpCounts // 最终形成的拓扑结构
	}
	// 执行上面的定义的processNode,执行的数量就是node的数量
	pl.parallelizer.Until(ctx, len(allNodes), processNode)
	// 最后构建出 TpPairToMatchNum
	// 表示每个拓扑域中的每个子域各分布多少Pod,如图6所示
	for _, tpCounts := range tpCountsByNode {
		for tp, count := range tpCounts {
			s.TpPairToMatchNum[tp] += count
		}
	}
	if pl.enableMinDomainsInPodTopologySpread {
		// 根据状态进行构建 preFilterState
		s.TpKeyToDomainsNum = make(map[string]int, len(constraints))
		for tp := range s.TpPairToMatchNum {
			s.TpKeyToDomainsNum[tp.key]++
		}
	}

	// 计算最小匹配出的拓扑对
	for i := 0; i < len(constraints); i++ {
		key := constraints[i].TopologyKey
		s.TpKeyToCriticalPaths[key] = newCriticalPaths()
	}
	for pair, num := range s.TpPairToMatchNum {
		s.TpKeyToCriticalPaths[pair.key].update(pair.value, num)
	}

	return &s, nil // 返回的值则包含最小的分布
}

preFilterState

// preFilterState 是在PreFilter处计算并在Filter处使用。
// 它结合了 “TpKeyToCriticalPaths” 和 “TpPairToMatchNum” 来表示:
//(1)在每个分布约束上匹配最少pod的criticalPaths。 
// (2) 在每个分布约束上匹配的pod的数量。
// “nil preFilterState” 则表示没有设置(在PreFilter阶段);
// empty “preFilterState”对象则表示它是一个合法的状态,并在PreFilter阶段设置。

type preFilterState struct {
	Constraints []topologySpreadConstraint

    // 这里记录2条关键路径而不是所有关键路径。 
    // criticalPaths[0].MatchNum 始终保存最小匹配数。 
    // criticalPaths[1].MatchNum 总是大于或等于criticalPaths[0].MatchNum,但不能保证是第二个最小匹配数。
	TpKeyToCriticalPaths map[string]*criticalPaths
	
    // TpKeyToDomainsNum 以 “topologyKey” 作为key ,并以zone的数量作为值。
	TpKeyToDomainsNum map[string]int
	
    // TpPairToMatchNum 以 “topologyPair作为key” ,并以匹配到pod的数量作为value。
	TpPairToMatchNum map[topologyPair]int
}

criticalPaths

// [2]criticalPath能够工作的原因是基于当前抢占算法的实现,特别是以下两个事实
// 事实 1:只抢占同一节点上的Pod,而不是多个节点上的 Pod。
// 事实 2:每个节点在其抢占周期期间在“preFilterState”的单独副本上进行评估。如果我们计划转向更复杂的算法,例如“多个节点上的任意pod”时则需要重新考虑这种结构。
type criticalPaths [2]struct {
	// TopologyValue代表映射到拓扑键的拓扑值。
	TopologyValue string
	// MatchNum代表匹配到的pod数量
	MatchNum int
}

单元测试中的测试案例,具有两个约束条件的场景,通过表格来解析如下:

Node列表与标签如下表:

Node Name

标签:node,插件,nil,kubernetes,framework,Pod,搞懂,pod
来源: https://www.cnblogs.com/Cylon/p/16526862.html