kubelet监控静态Pod
作者:互联网
获取静态Pod路径
静态Pod路径默认是空。
当静态Pod路径是空时,路径会被设置成/etc/kubernetes/manifests。
检测周期
/var/lib/kubelet/config.yaml配置了FileCheckFrequency值是20s即List的间隔周期是20s。
List和Watch流程
doWatch函数的重试回退流程
主流程
pkg/kubelet/config/file_linux.go
startWatch函数
如果watch失败,那么看错误是否支持重试。
1. 支持重试,继续watch。
2. 不支持重试,进入回退流程。
创建Backoff对象后,每隔1s调用doWatch函数(如果处于回退流程中,那么需要等待,不会调用doWatch函数)。
const (
// 回退最小时间
retryPeriod = 1 * time.Second
// 回退最大时间
maxRetryPeriod = 20 * time.Second
)
func (s *sourceFile) startWatch() {
// 创建Backoff对象
backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
backOffId := "watch"
go wait.Forever(func() {
if backOff.IsInBackOffSinceUpdate(backOffId, time.Now()) {
return
}
if err := s.doWatch(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
if _, retryable := err.(*retryableError); !retryable {
backOff.Next(backOffId, time.Now())
}
}
}, retryPeriod)
}
创建Backoff对象
staging/src/k8s.io/client-go/util/flowcontrol/backoff.go
NewBackOff函数
type Duration int64
type Backoff struct {
sync.Mutex
Clock clock.Clock
defaultDuration time.Duration
maxDuration time.Duration
perItemBackoff map[string]*backoffEntry
}
type backoffEntry struct {
// 回退间隔时间
backoff time.Duration
// 回退开始时间
lastUpdate time.Time
}
type RealClock struct{}
// 参数值分别是1s和20s
func NewBackOff(initial, max time.Duration) *Backoff {
return &Backoff{
perItemBackoff: map[string]*backoffEntry{},
Clock: clock.RealClock{},
defaultDuration: initial,
maxDuration: max,
}
}
判断是否在回退流程中
staging/src/k8s.io/client-go/util/flowcontrol/backoff.go
IsInBackOffSinceUpdate函数
如果函数返回false,那么说明还在回退流程中;否则,说明回退流程已经结束。
1. 加锁。
2. 获取map里面key是“watch”对应的value,即backoffEntry。
3. 如果没有该key,那么返回false。
4. 如果当前时间与回退开始时间差值>2倍maxDuration,那么返回false。(此步多余,第5步已经覆盖了,2倍maxDuration>backoff)
5. 如果当前时间与回退开始时间差值<回退间隔时间,那么返回false;否则,返回true。
6. 释放锁。
func (p *Backoff) IsInBackOffSinceUpdate(id string, eventTime time.Time) bool {
p.Lock()
defer p.Unlock()
entry, ok := p.perItemBackoff[id]
if !ok {
return false
}
if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
return false
}
return eventTime.Sub(entry.lastUpdate) < entry.backoff
}
插入或更新回退间隔时间、更新回退开始时间
staging/src/k8s.io/client-go/util/flowcontrol/backoff.go
Next函数
1. 加锁。
2. 获取map里面key是“watch”对应的value,即backoffEntry。
3. 如果没有该key或者当前时间与回退开始时间差值>2倍maxDuration,那么初始化(插入key-“watch”对应的backoffEntry);否则,backoffEntry的回退间隔时间backoff翻倍,最大值是maxDuration即20s。
4. 更新backoffEntry里面的lastUpdate即回退开始时间。
5. 释放锁。
// 当前时间与回退开始时间差值最大是2倍maxDuration。
func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool {
return eventTime.Sub(lastUpdate) > maxDuration*2
}
func (p *Backoff) initEntryUnsafe(id string) *backoffEntry {
// 起始回退间隔时间是1s。
entry := &backoffEntry{backoff: p.defaultDuration}
p.perItemBackoff[id] = entry
return entry
}
func (p *Backoff) Next(id string, eventTime time.Time) {
p.Lock()
defer p.Unlock()
entry, ok := p.perItemBackoff[id]
// 只有第一次回退或者当前时间与回退开始时间差值超过2倍maxDuration即40s,才会把回退间隔时间重置成1s。
if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
entry = p.initEntryUnsafe(id)
} else {
delay := entry.backoff * 2 // exponential
// 确保回退间隔时间最大是20s。
entry.backoff = time.Duration(integer.Int64Min(int64(delay), int64(p.maxDuration)))
}
entry.lastUpdate = p.Clock.Now()
}
回退间隔时间的变化(不是单调递增,而是先增后保持不变)
1s |
2s |
4s |
8s |
16s |
20s |
... |
20s |
一开始,呈现2的指数级变化,直到16s。最后,一直20s。
除非当前时间与回退开始时间差值超过2倍maxDuration即40s,从而重置回退间隔时间为1s。
自己动手写Demo
编译并执行二进制文件
git clone git@gitee.com:wangjingqian1995/kubelet-monitor-manifests-demo.git
cd kubelet-monitor-manifests-demo
go build main.go
./main
测试数据
/root/test目录下放入a.log和b.log,内容分别如下:
$ cat a.log
{"Namespace":"Bett", "Name":"McLaugh", "Desc":"beijing"}
$ cat b.log
{"Namespace":"abc", "Name":"McLaugh", "Desc":"xxx"}
效果
流程图
缓存1的全量数据是最新的,缓存2的全量数据是旧的。
缓存1和缓存2对比方式:
遍历缓存1中所有的key,如果缓存1中的key在缓存2中没有,那么说明该info需要增加;如果缓存1中的key在缓存2中有而且value发生了变化,那么该info需要更新。
遍历缓存2中所有的key,如果缓存2中的key在缓存1中没有,那么说明该info需要删除。
把变化的信息发送到缓冲通道,交给数据处理中心。
标签:缓存,静态,kubelet,time,回退,entry,Pod,backoffEntry,maxDuration 来源: https://www.cnblogs.com/WJQ2017/p/16172620.html