调度器调度队列之 activeQ 分析 | 视频文字稿

共 17242字,需浏览 35分钟

 ·

2021-03-15 20:10

前面我们分析了 kube-scheduler 组件如何接收命令行参数,用传递的参数构造一个 Scheduler 对象,最终启动了调度器。调度器启动后就可以开始为未调度的 Pod 进行调度操作了,本文主要来分析调度器是如何对一个 Pod 进行调度操作过程中的活动队列。

调度队列

调度器启动后最终是调用 Scheduler 下面的 Run() 函数来开始调度 Pod,如下所示代码:

// pkg/scheduler/scheduler.go

// 等待 cache 同步完成,然后开始调度
func (sched *Scheduler) Run(ctx context.Context) {
 if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
  return
 }
 sched.SchedulingQueue.Run()
 wait.UntilWithContext(ctx, sched.scheduleOne, 0)
 sched.SchedulingQueue.Close()
}

首先会等待所有的 cache 同步完成,然后开始执行 SchedulingQueue 的 Run() 函数,SchedulingQueue 是一个队列接口,用于存储待调度的 Pod,该接口遵循类似于 cache.FIFOcache.Heap 这样的数据结构,要弄明白调度器是如何去调度 Pod 的,我们就首先需要弄清楚这个结构:

// pkg/scheduler/internal/queue/scheduling_queue.go

// 用于存储带调度 Pod 的队列接口
type SchedulingQueue interface {
 framework.PodNominator
 // AddUnschedulableIfNotPresent 将无法调度的 Pod 添加回调度队列
  // podSchedulingCycle表示可以通过调用 SchedulingCycle() 返回的当前调度周期号
 AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
  // SchedulingCycle 返回由调度队列缓存的当前调度周期数。 
  // 通常,只要弹出一个 Pod(例如调用 Pop() 函数),就增加此数字。
 SchedulingCycle() int64
  
  // 下面是通用队列相关操作
  // Pop 删除队列的头并返回它。 
  // 如果队列为空,它将阻塞,并等待直到新元素添加到队列中
 Pop() (*framework.QueuedPodInfo, error)
  // 往队列中添加一个 Pod
 Add(pod *v1.Pod) error
 Update(oldPod, newPod *v1.Pod) error
 Delete(pod *v1.Pod) error

 MoveAllToActiveOrBackoffQueue(event string)
 AssignedPodAdded(pod *v1.Pod)
 AssignedPodUpdated(pod *v1.Pod)
 PendingPods() []*v1.Pod
  // 关闭 SchedulingQueue,以便等待 pop 元素的 goroutine 可以正常退出
 Close()
 // NumUnschedulablePods 返回 SchedulingQueue 中存在的不可调度 Pod 的数量
 NumUnschedulablePods() int
 // 启动管理队列的goroutine
 Run()
}

SchedulingQueue 是一个用于存储带调度 Pod 的队列接口,在构造 Scheduler 对象的时候我们可以了解到调度器中是如何实现这个队列接口的:

// pkg/scheduler/factory.go

// Profiles are required to have equivalent queue sort plugins.
lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
podQueue := internalqueue.NewSchedulingQueue(
 lessFn,
 internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
 internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
 internalqueue.WithPodNominator(nominator),
)
......
return &Scheduler{
 ......
 NextPod:         internalqueue.MakeNextPodFunc(podQueue),
  ......
 SchedulingQueue: podQueue,
}, nil

可以看到上面的 internalqueue.NewSchedulingQueue 就是创建的一个 SchedulingQueue 对象,定义如下所示:

// pkg/scheduler/internal/queue/scheduling_queue.go

// 初始化一个优先级队列作为一个新的调度队列
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
 return NewPriorityQueue(lessFn, opts...)
}

// 配置 PriorityQueue
type Option func(*priorityQueueOptions)

// 创建一个 PriorityQueue 对象
func NewPriorityQueue(
 lessFn framework.LessFunc,
 opts ...Option,
)
 *PriorityQueue
 {
  ......

  comp := func(podInfo1, podInfo2 interface{}) bool {
  pInfo1 := podInfo1.(*framework.QueuedPodInfo)
  pInfo2 := podInfo2.(*framework.QueuedPodInfo)
  return lessFn(pInfo1, pInfo2)
 }
  ......

  pq := &PriorityQueue{
  PodNominator:              options.podNominator,
  clock:                     options.clock,
  stop:                      make(chan struct{}),
  podInitialBackoffDuration: options.podInitialBackoffDuration,
  podMaxBackoffDuration:     options.podMaxBackoffDuration,
  activeQ:                   heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
  unschedulableQ:            newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
  moveRequestCycle:          -1,
 }
 pq.cond.L = &pq.lock
 pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())

 return pq
}

从上面的初始化过程可以看出来 PriorityQueue 这个优先级队列实现了 SchedulingQueue 接口,所以真正的实现还需要去查看这个优先级队列:

// pkg/scheduler/internal/queue/scheduling_queue.go

// PriorityQueue 实现了调度队列 SchedulingQueue
// PriorityQueue 的头部元素是优先级最高的 pending Pod,该结构有三个子队列:
// 一个子队列包含正在考虑进行调度的 Pod,称为 activeQ,是一个堆
// 另一个队列包含已尝试并且确定为不可调度的 Pod,称为 unschedulableQ
// 第三个队列包含从 unschedulableQ 队列移出的 Pod,并在 backoff 完成后将其移到 activeQ 队列
type PriorityQueue struct {
 framework.PodNominator

 stop  chan struct{}
 clock util.Clock

 // pod 初始 backoff 的时间
 podInitialBackoffDuration time.Duration
 // pod 最大 backoff 的时间
 podMaxBackoffDuration time.Duration

 lock sync.RWMutex
 cond sync.Cond  // condition

  // activeQ 是调度程序主动查看以查找要调度 pod 的堆结构,堆头部是优先级最高的 Pod
 activeQ *heap.Heap
  // backoff 队列
 podBackoffQ *heap.Heap
 // unschedulableQ 不可调度队列
 unschedulableQ *UnschedulablePodsMap
  // 调度周期的递增序号,当 pop 的时候会增加
 schedulingCycle int64
  // moveRequestCycle 会缓存 schedulingCycle 的值
  // 当未调度的 Pod 重新被添加到 activeQ 中会保存 schedulingCycle 到 moveRequestCycle 中
 moveRequestCycle int64

 // 表明队列已经被关闭
 closed bool
}

这里使用的是一个 PriorityQueue 优先级队列来存储带调度的 Pod,这个也很好理解,普通队列是一个 FIFO 数据结构,根据元素进入队列的顺序依次出队,而对于调度的这个场景,优先级队列显然更合适,可以根据某些优先级策略,优先对某个 Pod 进行调度。

PriorityQueue 的头部元素是优先级最高的带调度的 Pod,该结构有三个子队列:

  • 活动队列(activeQ)
  • 不可调度队列(unschedulableQ):当 Pod 不能满足被调度的条件的时候就会被加入到这个不可调度的队列中来,等待后续继续进行尝试调度
  • backoff 队列:如果任务反复执行还是失败,则会按尝试次数增加等待调度时间,降低重试效率,从而避免反复失败浪费调度资源。对于调度失败的 Pod 会优先存储在 backoff 队列中,等待后续进行重试

这里我们需要来弄清楚这几个队列是如何实现的。

活动队列

活动队列(activeQ)是存储当前系统中所有在等待调度的 Pod 队列,在上面实例化优先级队列里面可以看到 activeQ 队列的初始化是通过调用 heap.NewWithRecorder() 函数实现的。

// pkg/scheduler/internal/heap/heap.go

// NewWithRecorder 就是 Heap 基础上包装了 metrics 数据
func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.MetricRecorder) *Heap {
 return &Heap{
  data: &data{
   items:    map[string]*heapItem{},
   queue:    []string{},
   keyFunc:  keyFn,
   lessFunc: lessFn,
  },
  metricRecorder: metricRecorder,
 }
}

// lessFunc 接收两个元素,对列表进行排序时,将第一个元素放在第二个元素之前,则返回true。
type lessFunc = func(item1, item2 interface{}) bool

其中的 data 数据结构是 Golang 中的一个标准 heap 堆(只需要实现 heap.Interface 接口即可),然后 Heap 是在 data 基础上新增了一个用于记录 metrics 数据的堆,这里最重要的就是用比较元素优先级的 lessFunc 函数的实现,在初始化优先级队列的时候我们传入了一个 comp 的参数,这个参数就是 activeQ 这个堆里面的 lessFunc 函数的实现:

comp := func(podInfo1, podInfo2 interface{}) bool {
  pInfo1 := podInfo1.(*framework.QueuedPodInfo)
  pInfo2 := podInfo2.(*framework.QueuedPodInfo)
  return lessFn(pInfo1, pInfo2)
 }

最终是调用的创建 Scheduler 对象的时候传入的 lessFn 参数:

lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()

从这里可以看到比较元素优先级是通过调度框架的 QueueSortFunc() 函数来实现的,对应的实现如下所示:

// pkg/scheduler/framework/runtime/framework.go

// QueueSortFunc 返回用于对调度队列中的 Pod 进行排序的函数
func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
 if f == nil {
  // 如果 frameworkImpl 为nil,则只需保持其顺序不变
  // NOTE: 主要用于测试
  return func(_, _ *framework.QueuedPodInfo) bool { return false }
 }
  // 如果没有 queuesort 插件
 if len(f.queueSortPlugins) == 0 {
  panic("No QueueSort plugin is registered in the frameworkImpl.")
 }

 // 只有一个 QueueSort 插件有效
 return f.queueSortPlugins[0].Less
}

最终真正用于优先级队列元素优先级比较的函数是通过 QueueSort 插件来实现的,默认启用的 QueueSort 插件是 PrioritySort:

// pkg/scheduler/algorithmprovider/registry.go

func getDefaultConfig() *schedulerapi.Plugins {
 return &schedulerapi.Plugins{
  QueueSort: &schedulerapi.PluginSet{
   Enabled: []schedulerapi.Plugin{
    {Name: queuesort.Name},
   },
  },
    ......

PrioritySort 这个插件的核心实现就是其 Less 函数了:

// pkg/scheduler/framework/plugins/queuesort/priority_sort.go

// Less 是 activeQ 队列用于对 Pod 进行排序的函数。
// 它根据 Pod 的优先级对 Pod 进行排序,
// 当优先级相同时,它使用 PodQueueInfo.timestamp 进行比较
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
 p1 := pod.GetPodPriority(pInfo1.Pod)
 p2 := pod.GetPodPriority(pInfo2.Pod)
  // 先根据优先级的高低进行比较,然后根据 Pod 的创建时间
  // 越高优先级的 Pod 越被优先调度,越早创建的pod越优先
 return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}

// pkg/api/v1/pod/util.go

// GetPodPriority 获取指定 Pod 的优先级
func GetPodPriority(pod *v1.Pod) int32 {
 if pod.Spec.Priority != nil {
  return *pod.Spec.Priority
 }
 return 0
}

到这里就真相大白了,对于 activeQ 活动队列中的 Pod 是依靠 PrioritySort 插件来进行优先级比较的,每个 Pod 在被创建后都会有一个 priority 属性来标记 Pod 优先级,然后在调度 Pod 的时候会先根据 Pod 优先级的高低进行比较,如果优先级相同,则回根据 Pod 的创建时间进行比较,越高优先级的 Pod 越被优先调度,越早创建的Pod 越优先。

那么 Pod 是在什么时候加入到 activeQ 活动队列的呢?还记得前面我们在创建 Scheduler 对象的时候有一个 addAllEventHandlers 函数吗?其中就有对未调度 Pod 的事件监听处理操作。

// pkg/scheduler/eventhandlers.go

// unscheduled pod queue
podInformer.Informer().AddEventHandler(
 cache.FilteringResourceEventHandler{
  FilterFunc: func(obj interface{}) bool {
   switch t := obj.(type) {
   case *v1.Pod:
    return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
   case cache.DeletedFinalStateUnknown:
    if pod, ok := t.Obj.(*v1.Pod); ok {
     return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
    }
    utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
    return false
   default:
    utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
    return false
   }
  },
  Handler: cache.ResourceEventHandlerFuncs{
   AddFunc:    sched.addPodToSchedulingQueue,
   UpdateFunc: sched.updatePodInSchedulingQueue,
   DeleteFunc: sched.deletePodFromSchedulingQueue,
  },
 },
)

当 Pod 有事件变化后,首先回通过 FilterFunc 函数进行过滤,如果 Pod 没有绑定到节点(未调度)并且使用的是指定的调度器才进入下面的 Handler 进行处理,比如当创建 Pod 以后就会有 onAdd 的添加事件,这里调用的就是 sched.addPodToSchedulingQueue 函数:

// pkg/scheduler/eventhandlers.go

// 添加未调度的 Pod 到优先级队列
func(sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
   pod := obj.(*v1.Pod)
   klog.V(3).Infof("add event for unscheduled pod %s/%s", pod.Namespace, pod.Name)
   if err := sched.SchedulingQueue.Add(pod); err != nil {
      utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
   }
}

可以看到这里当 Pod 被创建后,会将 Pod 通过调度队列 SchedulingQueue 的 Add 函数添加到优先级队列中去:

// pkg/scheduler/internal/queue/scheduling_queue.go

// Add 添加 Pod 到 activeQ 活动队列,仅当添加了新的 Pod 时才应调用它
// 这样 Pod 就不会已经处于 active/unschedulable/backoff 队列中了
func (p *PriorityQueue) Add(pod *v1.Pod) error {
 p.lock.Lock()
 defer p.lock.Unlock()
 pInfo := p.newQueuedPodInfo(pod)
  // 添加到 activeQ 队列
 if err := p.activeQ.Add(pInfo); err != nil {
  klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
  return err
 }
  // 如果在 unschedulableQ 队列中,则从改队列移除
 if p.unschedulableQ.get(pod) != nil {
  klog.Errorf("Error: pod %v is already in the unschedulable queue.", nsNameForPod(pod))
  p.unschedulableQ.delete(pod)
 }
 // 从 backoff 队列删除
 if err := p.podBackoffQ.Delete(pInfo); err == nil {
  klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod))
 }
  // 记录metrics
 metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
 p.PodNominator.AddNominatedPod(pod, "")
  // 通知其他地方进行处理
 p.cond.Broadcast()

 return nil
}

这就是 activeQ 活动队列添加元素的过程。

调度 Pod

当我们把新创建的 Pod 添加到 activeQ 活动队列过后,就可以在另外的协程中从这个队列中弹出堆顶的元素来进行具体的调度处理了。这里就要回头本文开头部分调度器启动后执行的一个调度操作了 sched.scheduleOne

// pkg/scheduler/scheduler.go

// scheduleOne 为单个 Pod 完成整个调度工作流程
func (sched *Scheduler) scheduleOne(ctx context.Context) {
  // 从调度器中获取下一个要调度的 Pod
 podInfo := sched.NextPod()
 ......
}

scheduleOne 函数在最开始调用 sched.NextPod() 函数来获取现在需要调度的 Pod,其实就是上面 activeQ 活动队列中 Pop 出来的元素,当实例化 Scheduler 对象的时候就指定了 NextPod 函数:internalqueue.MakeNextPodFunc(podQueue)

// pkg/scheduler/internal/queue/scheduling_queue.go

// MakeNextPodFunc 返回一个函数,用于从指定的调度队列中获取下一个 Pod 进行调度
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
 return func() *framework.QueuedPodInfo {
  podInfo, err := queue.Pop()
  ......
  return nil
 }
}

很明显这里就是调用的优先级队列的 Pop() 函数来弹出队列中的 Pod 进行调度处理。

// pkg/scheduler/internal/queue/scheduling_queue.go

// Pop 删除 activeQ 活动队列的头部元素并返回它。
// 如果 activeQ 为空,它将阻塞,并等待直到新元素添加到队列中。
// 当 Pod 弹出后会增加调度周期参数的值。
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
 p.lock.Lock()
 defer p.lock.Unlock()
 for p.activeQ.Len() == 0 {
    // 当队列为空时,将阻塞Pop()的调用,直到新元素入队。
    // 调用Close()时,将设置p.closed并广播condition,这将导致此循环继续并从Pop()返回。
  if p.closed {
   return nil, fmt.Errorf(queueClosed)
  }
  p.cond.Wait()
 }
  // 从 activeQ 队列弹出堆顶元素
 obj, err := p.activeQ.Pop()
 if err != nil {
  return nil, err
 }
 pInfo := obj.(*framework.QueuedPodInfo)
 pInfo.Attempts++
  // 增加调度周期次数
 p.schedulingCycle++
 return pInfo, err
}

Pop() 函数很简单,就是从 activeQ 队列中弹出堆顶的元素返回即可。拿到了要调度的 Pod,接下来就是去真正执行调度逻辑了。


 点击屏末  | 学习k8s开发课程
浏览 74
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐