Pod 拓扑分布约束使用及调度原理

k8s技术圈

共 37303字,需浏览 75分钟

 ·

2021-03-26 19:06

在 k8s 集群调度中,“亲和性”相关的概念本质上都是控制 Pod 如何被调度 - 堆叠或是打散。目前 k8s 提供了 podAffinity 以及 podAntiAffinity 两个特性对 Pod 在不同拓扑域的分布进行了一些控制,podAffinity 可以将无数个 Pod 调度到特定的某一个拓扑域,这是堆叠的体现;podAntiAffinity 则可以控制一个拓扑域只存在一个 Pod,这是打散的体现。但这两种情况都太极端了,在不少场景下都无法达到理想的效果,例如为了实现容灾和高可用,将业务 Pod 尽可能均匀的分布在不同可用区就很难实现。

PodTopologySpread 特性的提出正是为了对 Pod 的调度分布提供更精细的控制,以提高服务可用性以及资源利用率,PodTopologySpreadEvenPodsSpread 特性门所控制,在 v1.16 版本第一次发布,并在 v1.18 版本进入 beta 阶段默认启用。再了解这个插件是如何实现之前,我们首先需要搞清楚这个特性是如何使用的。

使用规范

在 Pod 的 Spec 规范中新增了一个 topologySpreadConstraints 字段:

spec:
  topologySpreadConstraints:
  - maxSkew: <integer>
    topologyKey: <string>
    whenUnsatisfiable: <string>
    labelSelector: <object>

由于这个新增的字段是在 Pod spec 层面添加,因此更高层级的控制 (Deployment、DaemonSet、StatefulSet) 也能使用 PodTopologySpread 功能。

让我们结合上图来理解 topologySpreadConstraints 中各个字段的含义和作用:

  • labelSelector: 用来查找匹配的 Pod,我们能够计算出每个拓扑域中匹配该 label selector 的 Pod 数量,在上图中,假如 label selector 是 app:foo,那么 zone1 的匹配个数为 2, zone2 的匹配个数为 0。
  • topologyKey: 是 Node label 的 key,如果两个 Node 的 label 同时具有该 key 并且 label 值相同,就说它们在同一个拓扑域。在上图中,指定 topologyKey 为 zone, 具有 zone=zone1  标签的 Node 被分在一个拓扑域,具有 zone=zone2 标签的 Node 被分在另一个拓扑域。
  • maxSkew: 描述了 Pod 在不同拓扑域中不均匀分布的最大程度maxSkew 的取值必须大于 0。每个拓扑域都有一个 skew,计算的公式是: skew[i] = 拓扑域[i]中匹配的 Pod 个数 - min{其他拓扑域中匹配的 Pod 个数}。在上图中,我们新建一个带有 app=foo 标签的 Pod:
    • 如果该 Pod 被调度到 zone1,那么 zone1 中 Node 的 skew 值变为 3,zone2 中 Node 的 skew 值变为 0 (zone1 有 3 个匹配的 Pod,zone2 有 0 个匹配的 Pod )
    • 如果该 Pod 被调度到 zone2,那么 zone1 中 Node 的 skew 值变为 1,zone2 中 Node 的 skew 值变为 0 (zone2 有 1 个匹配的 Pod,拥有全局最小匹配 Pod 数的拓扑域正是 zone2 自己 )
  • whenUnsatisfiable: 描述了如果 Pod 不满足分布约束条件该采取何种策略:
    • DoNotSchedule (默认) 告诉调度器不要调度该 Pod,因此也可以叫作硬策略;
    • ScheduleAnyway 告诉调度器根据每个 Node 的 skew 值打分排序后仍然调度,因此也可以叫作软策略。

下面我们用两个实际的示例来进一步说明。

单个 TopologySpreadConstraint

假设你拥有一个 4 节点集群,其中标记为 foo:bar 的 3 个 Pod 分别位于 node1、node2 和 node3 中:

如果希望新来的 Pod 均匀分布在现有的可用区域,则可以按如下设置其约束:

kind: Pod
apiVersion: v1
metadata:
  name: mypod
  labels:
    foo: bar
spec:
  topologySpreadConstraints:
  - maxSkew: 1
    topologyKey: zone
    whenUnsatisfiable: DoNotSchedule
    labelSelector:
      matchLabels:
        foo: bar
  containers:
  - name: pause
    image: k8s.gcr.io/pause:3.1

topologyKey: zone 意味着均匀分布将只应用于存在标签键值对为 zone:<any value> 的节点。 whenUnsatisfiable: DoNotSchedule 告诉调度器如果新的 Pod 不满足约束,则不可调度。如果调度器将新的 Pod 放入 "zoneA",Pods 分布将变为 [3, 1],因此实际的偏差为 2(3 - 1),这违反了 maxSkew: 1 的约定。此示例中,新 Pod 只能放置在 "zoneB" 上:

或者

你可以调整 Pod 约束以满足各种要求:

  • 将 maxSkew 更改为更大的值,比如 "2",这样新的 Pod 也可以放在 "zoneA" 上。
  • 将 topologyKey 更改为 "node",以便将 Pod 均匀分布在节点上而不是区域中。在上面的例子中,如果 maxSkew 保持为 "1",那么传入的 Pod 只能放在 "node4" 上。
  • 将 whenUnsatisfiable: DoNotSchedule 更改为 whenUnsatisfiable: ScheduleAnyway, 以确保新的 Pod 可以被调度。

多个 TopologySpreadConstraint

上面是单个 Pod 拓扑分布约束的情况,下面的例子建立在前面例子的基础上来对多个 Pod 拓扑分布约束进行说明。假设你拥有一个 4 节点集群,其中 3 个标记为 foo:bar 的 Pod 分别位于 node1、node2 和 node3 上:

可以使用 2 个 TopologySpreadConstraint 来控制 Pod 在 区域和节点两个维度上的分布:

# two-constraints.yaml
kind: Pod
apiVersion: v1
metadata:
  name: mypod
  labels:
    foo: bar
spec:
  topologySpreadConstraints:
  - maxSkew: 1
    topologyKey: zone
    whenUnsatisfiable: DoNotSchedule
    labelSelector:
      matchLabels:
        foo: bar
  - maxSkew: 1
    topologyKey: node
    whenUnsatisfiable: DoNotSchedule
    labelSelector:
      matchLabels:
        foo: bar
  containers:
  - name: pause
    image: k8s.gcr.io/pause:3.1

在这种情况下,为了匹配第一个约束,新的 Pod 只能放置在 "zoneB" 中;而在第二个约束中, 新的 Pod 只能放置在 "node4" 上,最后两个约束的结果加在一起,唯一可行的选择是放置 在 "node4" 上。

多个约束之间可能存在冲突,假设有一个跨越 2 个区域的 3 节点集群:

如果对集群应用 two-constraints.yaml,会发现 "mypod" 处于 Pending 状态,这是因为为了满足第一个约束,"mypod" 只能放在 "zoneB" 中,而第二个约束要求 "mypod" 只能放在 "node2" 上,Pod 调度无法满足这两种约束,所以就冲突了。

为了克服这种情况,你可以增加 maxSkew 或修改其中一个约束,让其使用 whenUnsatisfiable: ScheduleAnyway

集群默认约束

除了为单个 Pod 设置拓扑分布约束,也可以为集群设置默认的拓扑分布约束,默认拓扑分布约束在且仅在以下条件满足 时才会应用到 Pod 上:

  • Pod 没有在其 .spec.topologySpreadConstraints 设置任何约束;
  • Pod 隶属于某个服务、副本控制器、ReplicaSet 或 StatefulSet。

你可以在 调度方案(Schedulingg Profile)中将默认约束作为 PodTopologySpread 插件参数的一部分来进行设置。约束的设置采用和前面 Pod 中的规范一致,只是 labelSelector 必须为空。配置的示例可能看起来像下面这个样子:

apiVersion: kubescheduler.config.k8s.io/v1beta1
kind: KubeSchedulerConfiguration

profiles:
  - pluginConfig:
      - name: PodTopologySpread
        args:
          defaultConstraints:
            - maxSkew: 1
              topologyKey: topology.kubernetes.io/zone
              whenUnsatisfiable: ScheduleAnyway
          defaultingType: List

预选

前面了解了如何使用 Pod 拓扑分布约束,接下来我们就可以来看下调度器中对应插件是如何实现的了。

PreFilter

首先也是去查看这个插件的 PreFilter 函数的实现:

// pkg/scheduler/framework/plugins/podtopologyspread/filtering.go

// 在 prefilter 扩展点调用
func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
 s, err := pl.calPreFilterState(pod)
 if err != nil {
  return framework.NewStatus(framework.Error, err.Error())
 }
 cycleState.Write(preFilterStateKey, s)
 return nil
}

这里最核心的就是 calPreFilterState 函数,该函数用来计算描述如何在拓扑域上传递 Pod 的 preFilterState 状态数据,在了解该函数如何实现之前,我们需要先弄明白 preFilterState 的定义:

// pkg/scheduler/framework/plugins/podtopologyspread/common.go

type topologyPair struct {
 key   string
 value string
}

// 拓扑分布约束定义
type topologySpreadConstraint struct {
 MaxSkew     int32
 TopologyKey string
 Selector    labels.Selector
}

// pkg/scheduler/framework/plugins/podtopologyspread/filtering.go

const preFilterStateKey = "PreFilter" + Name

// preFilterState 在 PreFilter 处进行计算,在 Filter 中使用。
// 它结合了 TpKeyToCriticalPaths 和 TpPairToMatchNum 来表示。
// (1) 最少的 Pod 在每个分布约束上匹配的关键路径。
// (2) 在每个分布约束上匹配的 Pod 数量。
// 一个 nil preFilterState 表示没有设置(在 PreFilter 阶段);
// 一个空的 preFilterState 对象是一个合法的状态,在 PreFilter 阶段进行设置。
type preFilterState struct {
  // demo: {{
 //     MaxSkew:     1,
 //     TopologyKey: "zone",
 //     Selector:    ......,
 //    }}
 Constraints []topologySpreadConstraint
  // 这里记录2条关键路径,而不是所有的关键路径。
  // criticalPaths[0].MatchNum 总是保持最小的匹配数。
  // criticalPaths[1].MatchNum 总是大于或等于criticalPaths[0].MatchNum,但不能保证是第2个最小匹配数。
  // demo: {
 //    "zone": {{"zone3", 0}, {"zone2", 2}},
 //    "node": {{"node-b", 1}, {"node-a", 2}},   
 //   }
 TpKeyToCriticalPaths map[string]*criticalPaths
 // TpPairToMatchNum 以 topologyPair 为 key,匹配的 Pods 数量为 value 值
  // demo: {key: "zone", value: "zone1"}: pointer.Int32Ptr(3),
 //    {key: "zone", value: "zone2"}: pointer.Int32Ptr(2),
 //    {key: "zone", value: "zone3"}: pointer.Int32Ptr(0),
  //       {key: "node", value: "node-a"}: pointer.Int32Ptr(2),
 //   {key: "node", value: "node-b"}: pointer.Int32Ptr(1),
 TpPairToMatchNum map[topologyPair]*int32
}

type criticalPaths [2]struct {
 // TopologyValue 拓扑Key对应的拓扑值
 TopologyValue string
 // MatchNum 匹配的 Pod 数量
 MatchNum int32
}

preFilterState 中定义了3个属性,在 PreFilter 处进行计算,在 Filter 中使用:

  • Constraints 用来保存定义的所有拓扑分布约束信息
  • TpKeyToCriticalPaths 是一个 map,以定义的拓扑 Key 为 Key,值是一个 criticalPaths 指针,criticalPaths 的定义不太好理解,是一个两个长度的结构体数组,结构体里面保存的是定义的拓扑对应的 Value 值以及该拓扑下匹配的 Pod 数量,而且需要注意的是这个数组的第一个元素中匹配数量是最小的(其实这里定义一个结构体就可以,只是为了保证获取到的是最小的匹配数量,就定义了两个,第二个是用来临时比较用的,真正有用的是第一个结构体
  • TpPairToMatchNum 同样是一个 map,对应的 Key 是 topologyPair,这个类型其实就是一个拓扑对,Values 值就是这个拓扑对下匹配的 Pod 数

这里可能不是很好理解,我们用测试代码中的一段测试用例来进行说明可能更好理解:

// pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go

{
  name: "normal case with two spreadConstraints",
  pod: st.MakePod().Name("p").Label("foo""").
   SpreadConstraint(1"zone", v1.DoNotSchedule, fooSelector).
   SpreadConstraint(1"node", v1.DoNotSchedule, fooSelector).
   Obj(),
  nodes: []*v1.Node{
   st.MakeNode().Name("node-a").Label("zone""zone1").Label("node""node-a").Obj(),
   st.MakeNode().Name("node-b").Label("zone""zone1").Label("node""node-b").Obj(),
   st.MakeNode().Name("node-x").Label("zone""zone2").Label("node""node-x").Obj(),
   st.MakeNode().Name("node-y").Label("zone""zone2").Label("node""node-y").Obj(),
  },
  existingPods: []*v1.Pod{
   st.MakePod().Name("p-a1").Node("node-a").Label("foo""").Obj(),
   st.MakePod().Name("p-a2").Node("node-a").Label("foo""").Obj(),
   st.MakePod().Name("p-b1").Node("node-b").Label("foo""").Obj(),
   st.MakePod().Name("p-y1").Node("node-y").Label("foo""").Obj(),
   st.MakePod().Name("p-y2").Node("node-y").Label("foo""").Obj(),
   st.MakePod().Name("p-y3").Node("node-y").Label("foo""").Obj(),
   st.MakePod().Name("p-y4").Node("node-y").Label("foo""").Obj(),
  },
  want: &preFilterState{
   Constraints: []topologySpreadConstraint{
    {
     MaxSkew:     1,
     TopologyKey: "zone",
     Selector:    mustConvertLabelSelectorAsSelector(t, fooSelector),
    },
    {
     MaxSkew:     1,
     TopologyKey: "node",
     Selector:    mustConvertLabelSelectorAsSelector(t, fooSelector),
    },
   },
   TpKeyToCriticalPaths: map[string]*criticalPaths{
    "zone": {{"zone1", 3}, {"zone2", 4}},
    "node": {{"node-x", 0}, {"node-b", 1}},
   },
   TpPairToMatchNum: map[topologyPair]*int32{
    {key: "zone", value: "zone1"}:  pointer.Int32Ptr(3),
    {key: "zone", value: "zone2"}:  pointer.Int32Ptr(4),
    {key: "node", value: "node-a"}: pointer.Int32Ptr(2),
    {key: "node", value: "node-b"}: pointer.Int32Ptr(1),
    {key: "node", value: "node-x"}: pointer.Int32Ptr(0),
    {key: "node", value: "node-y"}: pointer.Int32Ptr(4),
   },
  },
 }

理解了 preFilterState  的定义,接下来我们就可以来分析 calPreFilterState 函数的实现了:

// pkg/scheduler/framework/plugins/podtopologyspread/filtering.go

func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, error) {
 // 获取所有节点信息
  allNodes, err := pl.sharedLister.NodeInfos().List()
 if err != nil {
  return nil, fmt.Errorf("listing NodeInfos: %v", err)
 }
 var constraints []topologySpreadConstraint
 if len(pod.Spec.TopologySpreadConstraints) > 0 {
  // 如果 Pod 中配置了 TopologySpreadConstraints,转换成这里的 topologySpreadConstraint 对象
  constraints, err = filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.DoNotSchedule)
  if err != nil {
   return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %v", err)
  }
 } else {
    // 获取默认配置的拓扑分布约束
  constraints, err = pl.defaultConstraints(pod, v1.DoNotSchedule)
  if err != nil {
   return nil, fmt.Errorf("setting default hard topology spread constraints: %v", err)
  }
 }
  // 没有约束,直接返回
 if len(constraints) == 0 {
  return &preFilterState{}, nil
 }
  // 初始化 preFilterState 状态
 s := preFilterState{
  Constraints:          constraints,
  TpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
  TpPairToMatchNum:     make(map[topologyPair]*int32, sizeHeuristic(len(allNodes), constraints)),
 }
 for _, n := range allNodes {
  node := n.Node()
  if node == nil {
   klog.Error("node not found")
   continue
  }
    // 如果定义了 NodeAffinity 或者 NodeSelector,则应该分布到这些过滤器的节点
  if !helper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
   continue
  }
  // 保证现在的节点的标签包含 Constraints 中的所有 topologyKeys
  if !nodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
   continue
  }
    // 根据约束初始化拓扑对
  for _, c := range constraints {
   pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
   s.TpPairToMatchNum[pair] = new(int32)
  }
 }
 
 processNode := func(i int) {
  nodeInfo := allNodes[i]
  node := nodeInfo.Node()
    // 计算每一个拓扑对下匹配的 Pod 总数
  for _, constraint := range constraints {
   pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
      tpCount := s.TpPairToMatchNum[pair]
   if tpCount == nil {
    continue
   }
      // 计算约束的拓扑域中匹配的 Pod 数
   count := countPodsMatchSelector(nodeInfo.Pods, constraint.Selector, pod.Namespace)
   atomic.AddInt32(tpCount, int32(count))
  }
 }
 parallelize.Until(context.Background(), len(allNodes), processNode)

 // 计算每个拓扑的最小匹配度(保证第一个Path下是最小的值)
 for i := 0; i < len(constraints); i++ {
  key := constraints[i].TopologyKey
  s.TpKeyToCriticalPaths[key] = newCriticalPaths()
 }
 for pair, num := range s.TpPairToMatchNum {
  s.TpKeyToCriticalPaths[pair.key].update(pair.value, *num)
 }

 return &s, nil
}

// update 函数就是来保证 criticalPaths 中的第一个元素是最小的 Pod 匹配数
func (p *criticalPaths) update(tpVal string, num int32) {
 // first verify if `tpVal` exists or not
 i := -1
 if tpVal == p[0].TopologyValue {
  i = 0
 } else if tpVal == p[1].TopologyValue {
  i = 1
 }
 if i >= 0 {
  // `tpVal` exists
  p[i].MatchNum = num
  if p[0].MatchNum > p[1].MatchNum {
   // swap paths[0] and paths[1]
   p[0], p[1] = p[1], p[0]
  }
 } else {
  // `tpVal` doesn't exist
  if num < p[0].MatchNum {
   // update paths[1] with paths[0]
   p[1] = p[0]
   // update paths[0]
   p[0].TopologyValue, p[0].MatchNum = tpVal, num
  } else if num < p[1].MatchNum {
   // update paths[1]
   p[1].TopologyValue, p[1].MatchNum = tpVal, num
  }
 }
}

首先判断 Pod 中是否定义了 TopologySpreadConstraint ,如果定义了就获取转换成 preFilterState 中的 Constraints,如果没有定义需要查看是否为调度器配置了默认的拓扑分布约束,如果都没有这就直接返回了。

然后循环所有的节点,先根据 NodeAffinity 或者 NodeSelector 进行过滤,然后根据约束中定义的 topologyKeys 过滤节点。

接着计算每个节点下的拓扑对匹配的 Pod 数量,存入 TpPairToMatchNum 中,最后就是要把所有约束中匹配的 Pod 数量最小(或稍大)的放入 TpKeyToCriticalPaths 中保存起来。整个 preFilterState 保存下来传递到后续的插件中使用,比如在 filter 扩展点中同样也注册了这个插件,所以我们可以来查看下在 filter 中是如何实现的。

Filter

在 preFilter 阶段将 Pod 拓扑分布约束的相关信息存入到了 CycleState  中,下面在 filter 阶段中就可以来直接使用这些数据了:

// pkg/scheduler/framework/plugins/podtopologyspread/filtering.go

func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
 node := nodeInfo.Node()
 if node == nil {
  return framework.NewStatus(framework.Error, "node not found")
 }
  // 获取 preFilsterState
 s, err := getPreFilterState(cycleState)
 if err != nil {
  return framework.NewStatus(framework.Error, err.Error())
 }
 // 如果没有拓扑匹配的数量或者没有约束,则直接返回
 if len(s.TpPairToMatchNum) == 0 || len(s.Constraints) == 0 {
  return nil
 }
  
 podLabelSet := labels.Set(pod.Labels)
  // 循环Pod设置的约束
 for _, c := range s.Constraints {
  tpKey := c.TopologyKey  // 拓扑Key
    // 检查当前节点是否有的对应拓扑Key
  tpVal, ok := node.Labels[c.TopologyKey]  
  if !ok {
   klog.V(5).Infof("node '%s' doesn't have required label '%s'", node.Name, tpKey)
   return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNodeLabelNotMatch)
  }
    // 如果拓扑约束的selector匹配pod本身标签,则selfMatchNum=1
  selfMatchNum := int32(0)
  if c.Selector.Matches(podLabelSet) {
   selfMatchNum = 1
  }
    // zone=zoneA   zone=zoneB
    //    p p p        p p
    // 一个拓扑域
  pair := topologyPair{key: tpKey, value: tpVal}
    // 获取指定拓扑key的路径匹配数量
    // [{zoneB 2}]
  paths, ok := s.TpKeyToCriticalPaths[tpKey]
  if !ok {
   klog.Errorf("internal error: get paths from key %q of %#v", tpKey, s.TpKeyToCriticalPaths)
   continue
  }
  
  // 获取最小匹配数量
  minMatchNum := paths[0].MatchNum
  matchNum := int32(0)
    // 获取当前节点所在的拓扑域匹配的Pod数量
  if tpCount := s.TpPairToMatchNum[pair]; tpCount != nil {
   matchNum = *tpCount
  }
    // 如果匹配的Pod数量 + 1或者0 - 最小的匹配数量 > MaxSkew
    // 则证明不满足约束条件
  skew := matchNum + selfMatchNum - minMatchNum
  if skew > c.MaxSkew {
   klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: MatchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, c.MaxSkew)
   return framework.NewStatus(framework.Unschedulable, ErrReasonConstraintsNotMatch)
  }
 }

 return nil
}

首先通过 CycleState 获取 preFilterState ,如果没有配置约束或者拓扑对匹配数量为0就直接返回了。

然后循环定义的拓扑约束,先检查当前节点是否有对应的 TopologyKey,没有就返回错误,然后判断拓扑对的分布程度是否大于 MaxSkew,判断方式为拓扑中匹配的 Pod 数量 + 1/0(如果 Pod 本身也匹配则为1) - 最小的 Pod 匹配数量 > MaxSkew ,这个也是前面我们在关于 Pod 拓扑分布约束中的 maxSkew 的含义描述的意思**。**

优选

PodTopologySpread 除了在预选阶段会用到,在打分阶段其实也会用到,在默认的插件注册函数中可以看到:

func getDefaultConfig() *schedulerapi.Plugins {
 return &schedulerapi.Plugins{
  ......
  PreFilter: &schedulerapi.PluginSet{
   Enabled: []schedulerapi.Plugin{
    {Name: podtopologyspread.Name},
    ......
   },
  },
  Filter: &schedulerapi.PluginSet{
   Enabled: []schedulerapi.Plugin{
    {Name: podtopologyspread.Name},
    ......
   },
  },
  ......
  PreScore: &schedulerapi.PluginSet{
   Enabled: []schedulerapi.Plugin{
    {Name: podtopologyspread.Name},
    ......
   },
  },
  Score: &schedulerapi.PluginSet{
   Enabled: []schedulerapi.Plugin{
    // Weight is doubled because:
    // - This is a score coming from user preference.
    // - It makes its signal comparable to NodeResourcesLeastAllocated.
    {Name: podtopologyspread.Name, Weight: 2},
    ......
   },
  },
  ......
 }
}

PreScore 与 Score

同样首先需要调用 PreScore 函数进行打分前的一些准备,把打分的数据存储起来:

// pkg/scheduler/framework/plugins/podtopologyspread/scoring.go

// preScoreState 在 PreScore 时计算,在 Score 时使用。
type preScoreState struct {
  // 定义的约束
 Constraints []topologySpreadConstraint
 // IgnoredNodes 是一组 miss 掉 Constraints[*].topologyKey 的节点名称
 IgnoredNodes sets.String
 // TopologyPairToPodCounts 以 topologyPair 为键,以匹配的 Pod 数量为值
 TopologyPairToPodCounts map[topologyPair]*int64
 // TopologyNormalizingWeight 是我们给每个拓扑的计数的权重
 // 这使得较小的拓扑的 Pod 数不会被较大的稀释
 TopologyNormalizingWeight []float64
}

// initPreScoreState 迭代 "filteredNodes" 来过滤掉没有设置 topologyKey 的节点,并进行初始化:
// 1) s.TopologyPairToPodCounts: 以符合条件的拓扑对和节点名称为键
// 2) s.IgnoredNodes: 不应得分的节点集合
// 3) s.TopologyNormalizingWeight: 根据拓扑结构中的数值数量给予每个约束的权重
func (pl *PodTopologySpread) initPreScoreState(s *preScoreState, pod *v1.Pod, filteredNodes []*v1.Node) error {
 var err error
  // 将 Pod 或者默认定义的约束转换到 Constraints 中
 if len(pod.Spec.TopologySpreadConstraints) > 0 {
  s.Constraints, err = filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.ScheduleAnyway)
  if err != nil {
   return fmt.Errorf("obtaining pod's soft topology spread constraints: %v", err)
  }
 } else {
  s.Constraints, err = pl.defaultConstraints(pod, v1.ScheduleAnyway)
  if err != nil {
   return fmt.Errorf("setting default soft topology spread constraints: %v", err)
  }
 }
 if len(s.Constraints) == 0 {
  return nil
 }
 topoSize := make([]intlen(s.Constraints))
  // 循环过滤节点得到的所有节点
 for _, node := range filteredNodes {
  if !nodeLabelsMatchSpreadConstraints(node.Labels, s.Constraints) {
   // 后面打分时,没有全部所需 topologyKeys 的节点会被忽略
   s.IgnoredNodes.Insert(node.Name)
   continue
  }
    // 循环约束条件
  for i, constraint := range s.Constraints {
   if constraint.TopologyKey == v1.LabelHostname {
    continue
   }
      // 拓扑对  初始化
   pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
   if s.TopologyPairToPodCounts[pair] == nil {
    s.TopologyPairToPodCounts[pair] = new(int64)
    topoSize[i]++  // 拓扑对数量+1
   }
  }
 }
  
 s.TopologyNormalizingWeight = make([]float64len(s.Constraints))
 for i, c := range s.Constraints {
  sz := topoSize[i]  // 拓扑约束数量
  if c.TopologyKey == v1.LabelHostname {
      // 如果 TopologyKey 是 Hostname 标签
   sz = len(filteredNodes) - len(s.IgnoredNodes)
  }
    // 计算拓扑约束的权重
  s.TopologyNormalizingWeight[i] = topologyNormalizingWeight(sz)
 }
 return nil
}

// topologyNormalizingWeight 根据拓扑存在的值的数量,计算拓扑的权重。
// 由于<size>至少为1(所有通过 Filters 的节点都在同一个拓扑结构中)
// 而k8s支持5k个节点,所以结果在区间<1.09,8.52>。
//
// 注意:当没有节点具有所需的拓扑结构时,<size> 也可以为0
// 然而在这种情况下,我们并不关心拓扑结构的权重
// 因为我们对所有节点都返回0分。
func topologyNormalizingWeight(size int) float64 {
 return math.Log(float64(size + 2))
}

// PreScore 构建写入 CycleState 用于后面的 Score 和 NormalizeScore 使用
func (pl *PodTopologySpread) PreScore(
 ctx context.Context,
 cycleState *framework.CycleState,
 pod *v1.Pod,
 filteredNodes []*v1.Node,
)
 *framework.Status
 {
  // 获取所有节点
 allNodes, err := pl.sharedLister.NodeInfos().List()
 if err != nil {
  return framework.NewStatus(framework.Error, fmt.Sprintf("error when getting all nodes: %v", err))
 }
  // 过滤后的节点或者当前没有节点,表示没有节点用于打分
 if len(filteredNodes) == 0 || len(allNodes) == 0 {
  return nil
 }
  // 初始化 preScoreState 状态
 state := &preScoreState{
  IgnoredNodes:            sets.NewString(),
  TopologyPairToPodCounts: make(map[topologyPair]*int64),
 }
 err = pl.initPreScoreState(state, pod, filteredNodes)
 if err != nil {
  return framework.NewStatus(framework.Error, fmt.Sprintf("error when calculating preScoreState: %v", err))
 }

 // 如果传入的 pod 没有软拓扑传播约束,则返回
 if len(state.Constraints) == 0 {
  cycleState.Write(preScoreStateKey, state)
  return nil
 }

 processAllNode := func(i int) {
  nodeInfo := allNodes[i]
  node := nodeInfo.Node()
  if node == nil {
   return
  }
  // (1) `node`应满足传入 pod 的 NodeSelector/NodeAffinity
  // (2) 所有的 topologyKeys 都需要存在于`node`中。
  if !pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) ||
   !nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints) {
   return
  }

  for _, c := range state.Constraints {
      // 拓扑对
   pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
   // 如果当前拓扑对没有与任何候选节点相关联,则继续避免不必要的计算
   // 每个节点的计数也被跳过,因为它们是在 Score 期间进行的
   tpCount := state.TopologyPairToPodCounts[pair]
   if tpCount == nil {
    continue
   }
      // 计算节点上匹配的所有 Pod 数量
   count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
   atomic.AddInt64(tpCount, int64(count))
  }
 }
 parallelize.Until(ctx, len(allNodes), processAllNode)

 cycleState.Write(preScoreStateKey, state)
 return nil
}

上面的处理逻辑整体比较简单,最重要的是计算每个拓扑约束的权重,这样才方便后面打分的时候计算分数,存入到 CycleState 后就可以了来查看具体的 Score 函数的实现了:

// pkg/scheduler/framework/plugins/podtopologyspread/scoring.go

// 在 Score 扩展点调用
func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
 nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
 if err != nil || nodeInfo.Node() == nil {
  return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
 }

 node := nodeInfo.Node()
 s, err := getPreScoreState(cycleState)
 if err != nil {
  return 0, framework.NewStatus(framework.Error, err.Error())
 }

 // 如果该节点不合格,则返回
 if s.IgnoredNodes.Has(node.Name) {
  return 0nil
 }

 // 每出现一个 <pair>,当前节点就会得到一个 <matchSum> 的分数。
  // 而我们将<matchSum>相加,作为这个节点的分数返回。
 var score float64
 for i, c := range s.Constraints {
  if tpVal, ok := node.Labels[c.TopologyKey]; ok {
   var cnt int64
   if c.TopologyKey == v1.LabelHostname {
        // 如果 TopologyKey 是 Hostname 则 cnt 为节点上匹配约束的 selector 的 Pod 数量
    cnt = int64(countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace))
   } else {
        // 拓扑对下匹配的 Pod 数量
    pair := topologyPair{key: c.TopologyKey, value: tpVal}
    cnt = *s.TopologyPairToPodCounts[pair]
   }
      // 计算当前节点所得分数
   score += scoreForCount(cnt, c.MaxSkew, s.TopologyNormalizingWeight[i])
  }
 }
 return int64(score), nil
}

// scoreForCount 根据拓扑域中匹配的豆荚数量、约束的maxSkew和拓扑权重计算得分。
// `maxSkew-1`加到分数中,这样拓扑域之间的差异就会被淡化,控制分数对偏斜的容忍度。
func scoreForCount(cnt int64, maxSkew int32, tpWeight float64) float64 {
 return float64(cnt)*tpWeight + float64(maxSkew-1)
}

在 Score 阶段就是为当前的节点去计算一个分数,这个分数就是通过拓扑对下匹配的 Pod 数量和对应权重的结果得到的一个分数,另外在计算分数的时候还加上了 maxSkew-1,这样可以淡化拓扑域之间的差异。

NormalizeScore

当所有节点的分数计算完成后,还需要调用 NormalizeScore 扩展插件:

// pkg/scheduler/framework/plugins/podtopologyspread/scoring.go

// NormalizeScore 在对所有节点打分过后调用
func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
 s, err := getPreScoreState(cycleState)
 if err != nil {
  return framework.NewStatus(framework.Error, err.Error())
 }
 if s == nil {
  return nil
 }

 // 计算 <minScore> and <maxScore>
 var minScore int64 = math.MaxInt64
 var maxScore int64
 for _, score := range scores {
  if s.IgnoredNodes.Has(score.Name) {
   continue
  }
  if score.Score < minScore {
   minScore = score.Score
  }
  if score.Score > maxScore {
   maxScore = score.Score
  }
 }
  // 循环 scores({node score}集合)
 for i := range scores {
  nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
  if err != nil {
   return framework.NewStatus(framework.Error, err.Error())
  }
  node := nodeInfo.Node()
    // 节点被忽略了,分数记为0
  if s.IgnoredNodes.Has(node.Name) {
   scores[i].Score = 0
   continue
  }
    // 如果 maxScore 为0,指定当前节点的分数为 MaxNodeScore
  if maxScore == 0 {
   scores[i].Score = framework.MaxNodeScore
   continue
  }
    // 计算当前节点分数
  s := scores[i].Score
  scores[i].Score = framework.MaxNodeScore * (maxScore + minScore - s) / maxScore
 }
 return nil
}

NormalizeScore 扩展是在 Score 扩展执行完成后,为每个 ScorePlugin 并行运行 NormalizeScore 方法,然后并为每个 ScorePlugin 应用评分默认权重,然后总结所有插件调用过后的分数,最后选择一个分数最高的节点。

到这里我们就完成了对 PodTopologySpread 的实现分析,我们利用该特性可以实现对 Pod 更加细粒度的控制,我们可以把 Pod 分布到不同的拓扑域,从而实现高可用性,这也有助于工作负载的滚动更新和平稳地扩展副本。

不过如果对 Deployment 进行缩容操作可能会导致 Pod 的分布不均衡,此外具有污点的节点上的 Pods 也会被统计到。


K8S 进阶训练营


 点击屏末  | 即刻学习
浏览 168
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报