剖析Kubernetes EnableEquivalenceClassCache提升Scheduler吞吐量的工作机制
< 返回列表时间: 2018-05-17来源:OSCHINA
Author: xidianwangtao@gmail.com
Equivalence Class概念及其意义
2015年,google发表的关于Borg的论文“ Large-scale cluster management at Google with Borg ”中对Equivalence Class的描述如下: Equivalence classes: Tasks in a Borg job usually have identical requirements and constraints, so rather than determining feasibility for every pending task on every machine, and scoring all the feasible machines, Borg only does feasibility and scoring for one task per equivalence class – a group of tasks with identical requirements.
Equivalence Class目前是用来在Kubernetes Scheduler加速Predicate,提升Scheduler的吞吐性能。Kubernetes scheduler及时维护着Equivalence Cache的数据,当某些情况发生时(比如delete node、bind pod等事件),需要立刻invalid相关的Equivalence Cache中的缓存数据。
一个Equivalence Class是用来定义一组具有相同Requirements和Constraints的Pods的相关信息的集合,在Scheduler进行Predicate阶段时可以只需对Equivalence Class中一个Pod进行Predicate,并把Predicate的结果放到Equivalence Cache中以供该Equivalence Class中其他Pods(成为Equivalent Pods)重用该结果。只有当Equivalence Cache中没有可以重用的Predicate Result才会进行正常的Predicate流程。
什么样的Pods会被归类到同一Equivalence Class呢?按照其定义,其实只要Pods有某些相同的field,比如resources requirement、label、affinity等,它们都被认为是Equivalent Pods,属于同一Equivalence Class。但是考虑到用户可能随时修改Pods的fields,会导致Scheduler需要及时更新该Pod所属的Equivalence Class变动,从而导致可能正在进行的Predicate需要感知这一变化并作出变更,这使得问题变得异常复杂。因此,目前Scheduler只把那些属于同一OwnerReference(包括RC,RS,Job, StatefulSet)的Pods归类到同一Equivalence Class,比如某个RS定义了N个副本,那么这N个副本Pods就对应一个Equivalence Class。Scheduler会为每个Equivalence Class中的Equivalent Pods计算出一个uint64 EquivalenceHash值。
注意,截止Kubernetes 1.10,即使有两个一样Pod Template的RS,也会对应两个Equivalence Class。
Equivalence Class工作原理
要想使用Equivalence Class需要启用 EnableEquivalenceClassCache Feature Gate,截止Kubernetes 1.10,该Feature还是Alpha阶段。
前期我的几遍关于scheduler的博客中对Predicate的分析中提到,所有注册成功的Predicate Policy都会在 scheduler.findNodesThatFit(pod, nodes, predicateFuncs ...) 过程中按照一定的并行数对每个node调用 scheduler.podFitsOnNode(pod, node, predicateFuncs ...) 进行注册的Predicate Policys检查。
podFitsOnNode的输入是一个pod,一个node和一系列注册成功的predicateFuncs,用来检查node是否满足该pod的预选条件。加入了Equivalence Class之后,预选阶段会发生了如下变化: 预选之前,先检查该pod是否有对应的Equivalence Class。 如果有对应的Equivalence Class,那么接下来检查Equivalence Cache中是否有可用的Predicate Result,否则触发完整的正常预选。 如果有可用的Predicate Result,那么直接使用该Cached Predicate Result完成预选,否则触发完整的正常预选。
Equivalence Cache会存储每个node的Predicates Results,是一个3层Map对象: 第一层key是node name,表示节点名称; 第二层key是predicateKey,表示预选策略,因此该node对应的algorithmCache Entries数量最多不超过Scheduler注册的Predicate Policies数量,这用来保证Cache大小,防止查找Equivalence Cache时性能太差。 第三层key是Equivalence Hash,前面已经提到过。
比如, algorithmCache[$nodeName].predicatesCache.Get($predicateKey)[$equivalenceHash] 表示 $equivalenceHash 对应的Pods在 $nodeName 节点上进行 $predicateKey 进行预选是否成功。
截止Kubernetes 1.10,predicateKey支持列表如下(20个): MatchInterPodAffinity CheckVolumeBinding CheckNodeCondition GeneralPredicates HostName PodFitsHostPorts MatchNodeSelector PodFitsResources NoDiskConflict PodToleratesNodeTaints CheckNodeUnschedulable PodToleratesNodeNoExecuteTaints CheckNodeLabelPresence CheckServiceAffinity MaxEBSVolumeCount MaxGCEPDVolumeCount MaxAzureDiskVolumeCount NoVolumeZoneConflict CheckNodeMemoryPressure CheckNodeDiskPressure
注意,即使该Pod找到对应的Equivalence Class,Equivalence Cache中也有可能没有可用的Predicate Result,或者对应的Predicate Result已经失效。这时就会触发正常的Predicate,并把Result写到Equivalence Cache中。
如何维护和更新Equivalence Cache呢?如果频繁的更新整个node对应的Equivalence Cache,这违背了Equivalence Cache设计的初衷,并不能提升Predicate的效率。
前面提到过Equivalence Cache的三层Map结构设计,第二层Key是predicateKey,因此Scheduler能做到只invalid单个Predicate Result,而不是盲目的invalid整个node的algorithmCache。
Scheduler会Watch相关API Objects Add/Update/Delete Event,并根据相关策略invalid对应的Equivalence Cache数据,具体的逻辑请看下面的源码分析部分。
Equivalence Class源码分析
Equivalence Cache数据结构
Equivalence Cache结构定义如下: // EquivalenceCache holds: // 1. a map of AlgorithmCache with node name as key // 2. function to get equivalence pod type EquivalenceCache struct { sync.RWMutex getEquivalencePod algorithm.GetEquivalencePodFunc algorithmCache map[string]AlgorithmCache } // The AlgorithmCache stores PredicateMap with predicate name as key type AlgorithmCache struct { // Only consider predicates for now predicatesCache *lru.Cache } Equivalence Cache真正的缓存数据是通过algorithmCache Map存储,其key为nodeName。 每个node上的Predicate Result Cache通过AlgorithmCache.predicateCache存储,predicateCache是LRU(Least Recently Used,最少最近使用算法)Cache,只能存储一定数量的Entries,Kubernetes中指定最大值为100(Kubernetes 1.10默认实现的Predicate Funcs一共有20个)。 LRU Cache是一个Cache置换算法,含义是“最近最少使用”,当Cache满(没有空闲的cache块)时,把满足“最近最少使用”的数据从Cache中置换出去,并且保证Cache中第一个数据是最近刚刚访问的。由“局部性原理”,这样的数据更有可能被接下来的程序访问,提升性能。 predicateCache也是k-v存储,key为predicateKey,value为PredicateMap。 predicateMap的key为uint64的Equivalence Hash,value为HostPredicate。 HostPredicate用来表示Pod使用Predicate Policy与某个node的匹配结果,结构如下: // HostPredicate is the cached predicate result type HostPredicate struct { Fit bool FailReasons []algorithm.PredicateFailureReason }

Equivalence Cache的核心操作 InvalidateCachedPredicateItem :用来从Equivalence Cache中删除某个node上某个predicate policy的所有EquivalenceHash(对应Equivalent Pods)的Predicate Result缓存数据。 func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) { ... if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { for predicateKey := range predicateKeys { algorithmCache.predicatesCache.Remove(predicateKey) } } ... } InvalidateCachedPredicateItemOfAllNodes :用来删除所有node上指定predicate policy集合对应的所有EquivalenceHash(对应Equivalent Pods)的Predicate Result缓存数据。 func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) { ... // algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates for _, algorithmCache := range ec.algorithmCache { for predicateKey := range predicateKeys { // just use keys is enough algorithmCache.predicatesCache.Remove(predicateKey) } } ... } PredicateWithECache :检查Equivalence Cache中的Predicate Result缓存数据是否有可用的数据,如果命中缓存,则直接根据缓存中的Predicate Result作为该pod在该node上该Predicate policy的预选结果返回。如果没命中,则返回false和失败原因。 // PredicateWithECache returns: // 1. if fit // 2. reasons if not fit // 3. if this cache is invalid // based on cached predicate results func (ec *EquivalenceCache) PredicateWithECache( podName, nodeName, predicateKey string, equivalenceHash uint64, needLock bool, ) (bool, []algorithm.PredicateFailureReason, bool) { ... if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist { predicateMap := cachePredicate.(PredicateMap) // TODO(resouer) Is it possible a race that cache failed to update immediately? if hostPredicate, ok := predicateMap[equivalenceHash]; ok { if hostPredicate.Fit { return true, []algorithm.PredicateFailureReason{}, false } return false, hostPredicate.FailReasons, false } // is invalid return false, []algorithm.PredicateFailureReason{}, true } } return false, []algorithm.PredicateFailureReason{}, true } UpdateCachedPredicateItem :当PredicateWithECache使用Predicate Result Cache数据命中失败时,scheduler会调用对应的Predicate Funcs触发真正的预选逻辑,完成之后,就通过UpdateCachedPredicateItem将刚预选的结果更新到Equivalence Cache缓存中。每个node的predicateCache的初始化也是在这里完成的。 // UpdateCachedPredicateItem updates pod predicate for equivalence class func (ec *EquivalenceCache) UpdateCachedPredicateItem( podName, nodeName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64, needLock bool, ) { ... if _, exist := ec.algorithmCache[nodeName]; !exist { ec.algorithmCache[nodeName] = newAlgorithmCache() } predicateItem := HostPredicate{ Fit: fit, FailReasons: reasons, } // if cached predicate map already exists, just update the predicate by key if v, ok := ec.algorithmCache[nodeName].predicatesCache.Get(predicateKey); ok { predicateMap := v.(PredicateMap) // maps in golang are references, no need to add them back predicateMap[equivalenceHash] = predicateItem } else { ec.algorithmCache[nodeName].predicatesCache.Add(predicateKey, PredicateMap{ equivalenceHash: predicateItem, }) } }
Equivalence Cache的初始化
Kubernetes在注册predicates、priorities、scheduler extenders时,同时也会进行Equivalence Cache的初始化,并将其传入scheduler config中。 // Creates a scheduler from a set of registered fit predicate keys and priority keys. func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { ... // Init equivalence class cache if c.enableEquivalenceClassCache && getEquivalencePodFuncFactory != nil { pluginArgs, err := c.getPluginArgs() if err != nil { return nil, err } c.equivalencePodCache = core.NewEquivalenceCache( getEquivalencePodFuncFactory(*pluginArgs), ) glog.Info("Created equivalence class cache") } ... } // NewEquivalenceCache creates a EquivalenceCache object. func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) *EquivalenceCache { return &EquivalenceCache{ getEquivalencePod: getEquivalencePodFunc, algorithmCache: make(map[string]AlgorithmCache), } }
NewEquivalenceCache负责Equivalence Cache的初始化工作,那么getEquivalencePod又是在哪完成注册的呢?defualt algorithm provider初始化时完成注册GetEquivalencePodFunc(只能使用defualt provider?通过configfile就不行吗?),注意这里 factory.PluginFactoryArgs 只传入了PVCInfo。 GetEquivalencePodFunc is a function that gets a EquivalencePod from a pod. pkg/scheduler/algorithmprovider/defaults/defaults.go:38 func init() { ... // Use equivalence class to speed up heavy predicates phase. factory.RegisterGetEquivalencePodFunction( func(args factory.PluginFactoryArgs) algorithm.GetEquivalencePodFunc { return predicates.NewEquivalencePodGenerator(args.PVCInfo) }, ) ... }
为什么只传入PVCInfo呢?或者为什么需要PVCInfo呢?要回答这个问题,我们先来看看EquivalencePod和getEquivalencePod的定义。 // EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods. type EquivalencePod struct { ControllerRef metav1.OwnerReference PVCSet sets.String }
EquivalencePod定义了具备哪些相同属性的Pods属于Equivalent Pods,Equivalence Hash就是根据Pod的EquivalencePod中指定的两个属性来计算的,这两个属性分别是: ControllerRef:对应Pod的meta.OwnerReference,对应Pod所属的Controller Object,可以是RS,RC,Job,StatefulSet类型之一。 PVCSet:是Pod所引用的所有PVCs IDs集合。
因此,只有两个Pod属于同一个Controller并且引用可同样的PVCs对象才被认为是EquivalentPod,对应同一个Equivalence Hash。
getEquivalencePod根据Pod Object中的OwnerReference和PVC信息获取它所属的EquivalencePod对象。 func (e *EquivalencePodGenerator) getEquivalencePod(pod *v1.Pod) interface{} { for _, ref := range pod.OwnerReferences { if ref.Controller != nil && *ref.Controller { pvcSet, err := e.getPVCSet(pod) if err == nil { // A pod can only belongs to one controller, so let's return. return &EquivalencePod{ ControllerRef: ref, PVCSet: pvcSet, } } return nil } } return nil }
何时生成Pod对应的Equivalence Hash
预选的入口是findNodesThatFit,也就是在findNodesThatFit中调用了getEquivalenceClassInfo计算Pod的EquivalenceHash,然后把该hash值传入podFitsOnNode中进行后续的Equivalence Class功能。 func findNodesThatFit( pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node, predicateFuncs map[string]algorithm.FitPredicate, extenders []algorithm.SchedulerExtender, metadataProducer algorithm.PredicateMetadataProducer, ecache *EquivalenceCache, schedulingQueue SchedulingQueue, alwaysCheckAllPredicates bool, ) ([]*v1.Node, FailedPredicateMap, error) { ... var equivCacheInfo *equivalenceClassInfo if ecache != nil { // getEquivalenceClassInfo will return immediately if no equivalence pod found equivCacheInfo = ecache.getEquivalenceClassInfo(pod) } checkNode := func(i int) { nodeName := nodes[i].Name fits, failedPredicates, err := podFitsOnNode( pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue, alwaysCheckAllPredicates, equivCacheInfo, ) ... } ... }
getEquivalenceClassInfo计算pod的EquivalenceHash的原理如下: // getEquivalenceClassInfo returns the equivalence class of given pod. func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceClassInfo { equivalencePod := ec.getEquivalencePod(pod) if equivalencePod != nil { hash := fnv.New32a() hashutil.DeepHashObject(hash, equivalencePod) return &equivalenceClassInfo{ hash: uint64(hash.Sum32()), } } return nil }
可见,EquivalenceHash就是对getEquivalencePod利用FNV算法进行哈希的。
Equivalent Pod的Predicate Result何时加到PredicateCache中
我们先看看podFitsOnNode的相关实现: func podFitsOnNode( pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, ecache *EquivalenceCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, equivCacheInfo *equivalenceClassInfo, ) (bool, []algorithm.PredicateFailureReason, error) { ... if predicate, exist := predicateFuncs[predicateKey]; exist { // Use an in-line function to guarantee invocation of ecache.Unlock() // when the in-line function returns. func() { var invalid bool if eCacheAvailable { // Lock ecache here to avoid a race condition against cache invalidation invoked // in event handlers. This race has existed despite locks in equivClassCacheimplementation. ecache.Lock() defer ecache.Unlock() // PredicateWithECache will return its cached predicate results. fit, reasons, invalid = ecache.PredicateWithECache( pod.GetName(), info.Node().GetName(), predicateKey, equivCacheInfo.hash, false) } if !eCacheAvailable || invalid { // we need to execute predicate functions since equivalence cache does not work fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) if err != nil { return } if eCacheAvailable { // Store data to update equivClassCacheafter this loop. if res, exists := predicateResults[predicateKey]; exists { res.Fit = res.Fit && fit res.FailReasons = append(res.FailReasons, reasons...) predicateResults[predicateKey] = res } else { predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons} } result := predicateResults[predicateKey] ecache.UpdateCachedPredicateItem( pod.GetName(), info.Node().GetName(), predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false) } } }() ... }
podFitsOnNode时会先通过PredicateWithECache检查是否Equivalence Cache中有该缓存命中: 如果有命中数据可用,则对应的Predicate Policy就算处理完成。 如果没有命中数据才会触发调用predicate,然后将predicate的结果通过UpdateCachedPredicateItem添加/更新到缓存中。
维护Equivalence Cache
我们回到Scheduler Config Factory,看看Scheduler中podInformer、nodeInformer、serviceInformer、pvcInformer等注册的EventHandler中对Equivalence Cache的操作。
Assume Pod
当完成pod的调度后,在Bind Node之前,会先进行Pod Assume,在Assume过程中,会对Equivalence Cache有操作。 // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous. // assume modifies `assumed`. func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { ... // Optimistically assume that the binding will succeed, so we need to invalidate affected // predicates in equivalence cache. // If the binding fails, these invalidated item will not break anything. if sched.config.Ecache != nil { sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host) } return nil }
Assume Pod时调用InvalidateCachedPredicateItemForPodAdd对Equivalence Cache进行操作。 func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { // GeneralPredicates: will always be affected by adding a new pod invalidPredicates := sets.NewString("GeneralPredicates") // MaxPDVolumeCountPredicate: we check the volumes of pod to make decision. for _, vol := range pod.Spec.Volumes { if vol.PersistentVolumeClaim != nil { invalidPredicates.Insert("MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount") } else { if vol.AWSElasticBlockStore != nil { invalidPredicates.Insert("MaxEBSVolumeCount") } if vol.GCEPersistentDisk != nil { invalidPredicates.Insert("MaxGCEPDVolumeCount") } if vol.AzureDisk != nil { invalidPredicates.Insert("MaxAzureDiskVolumeCount") } } } ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates) }
InvalidateCachedPredicateItemForPodAdd中可以看出,Assume Pod会删除该node上以下predicateKey对应的predicateCache: GeneralPredicates; 如果该pod中引用了PVCs,则会删除"MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount"这些PredicateCaches; 如果pod volume中使用了AWSElasticBlockStore,则会删除MaxEBSVolumeCount PredicateCache; 如果pod volume中使用了GCEPersistentDisk,则会删除MaxGCEPDVolumeCount PredicateCache; 如果pod volume中使用了AzureDisk,则会删除MaxAzureDiskVolumeCount PredicateCache;
Update Pod in Scheduled Pod Cache
在scheduler进行NewConfigFactory时,注册Update assignedNonTerminatedPod Event Handler为updatePodInCache。 func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { ... c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod) c.podQueue.AssignedPodUpdated(newPod) } func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { if c.enableEquivalenceClassCache { // if the pod does not have bound node, updating equivalence cache is meaningless; // if pod's bound node has been changed, that case should be handled by pod add & delete. if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName { if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) { // MatchInterPodAffinity need to be reconsidered for this node, // as well as all nodes in its same failure domain. c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( matchInterPodAffinitySet) } // if requested container resource changed, invalidate GeneralPredicates of this node if !reflect.DeepEqual(predicates.GetResourceRequest(newPod), predicates.GetResourceRequest(oldPod)) { c.equivalencePodCache.InvalidateCachedPredicateItem( newPod.Spec.NodeName, generalPredicatesSets) } } } }
updatePodInCache调用invalidateCachedPredicatesOnUpdatePod对Equivalence Cache做了如下处理: 如果pod Labels做了更新,那么会删除所有nodes上Equivalence Cache中的MatchInterPodAffinity PredicateCache; 如果pod的resource request做了更新,那么会删除该node上Equivalence Cache中的GeneralPredicates PredicateCache;
Delete Pod in Scheduled Pod Cache
同样的,当发生删除assignedNonTerminatedPod时,对应会调用invalidateCachedPredicatesOnDeletePod更新Equivalence Cache。 func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { if c.enableEquivalenceClassCache { // part of this case is the same as pod add. c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName) // MatchInterPodAffinity need to be reconsidered for this node, // as well as all nodes in its same failure domain. // TODO(resouer) can we just do this for nodes in the same failure domain c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( matchInterPodAffinitySet) // if this pod have these PV, cached result of disk conflict will become invalid. for _, volume := range pod.Spec.Volumes { if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil || volume.RBD != nil || volume.ISCSI != nil { c.equivalencePodCache.InvalidateCachedPredicateItem( pod.Spec.NodeName, noDiskConflictSet) } } } }
invalidateCachedPredicatesOnDeletePod更新Equivalence Cache的处理总结为: 删除该node上Equivalence Cache中的GeneralPredicates PredicateCache; 如果该pod中引用了PVCs,则会删除该node上Equivalence Cache中的"MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount"这些PredicateCaches; 如果pod volume中使用了AWSElasticBlockStore,则会删除该node上Equivalence Cache中的MaxEBSVolumeCount PredicateCache; 如果pod volume中使用了GCEPersistentDisk,则会删除该node上Equivalence Cache中的MaxGCEPDVolumeCount PredicateCache; 如果pod volume中使用了AzureDisk,则会删除该node上Equivalence Cache中的MaxAzureDiskVolumeCount PredicateCache; 删除所有nodes上Equivalence Cache中的MatchInterPodAffinity PredicateCache; 如果pod的resource request做了更新,那么会删除该node上Equivalence Cache中的GeneralPredicates PredicateCache; 如果pod volume中引用了GCEPersistentDisk、AWSElasticBlockStore、RBD、ISCSI之一,则删除该node上Equivalence Cache中的NoDiskConflict PredicateCache。
Update Node
当发生node update event时,对应会调用invalidateCachedPredicatesOnNodeUpdate更新Equivalence Cache。 func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) { if c.enableEquivalenceClassCache { // Begin to update equivalence cache based on node update // TODO(resouer): think about lazily initialize this set invalidPredicates := sets.NewString() if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) { invalidPredicates.Insert(predicates.GeneralPred) // "PodFitsResources" } if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) { invalidPredicates.Insert(predicates.GeneralPred, predicates.CheckServiceAffinityPred) // "PodSelectorMatches" for k, v := range oldNode.GetLabels() { // any label can be topology key of pod, we have to invalidate in all cases if v != newNode.GetLabels()[k] { invalidPredicates.Insert(predicates.MatchInterPodAffinityPred) } // NoVolumeZoneConflict will only be affected by zone related label change if isZoneRegionLabel(k) { if v != newNode.GetLabels()[k] { invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) } } } } oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations()) if oldErr != nil { glog.Errorf("Failed to get taints from old node annotation for equivalence cache") } newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations()) if newErr != nil { glog.Errorf("Failed to get taints from new node annotation for equivalence cache") } if !reflect.DeepEqual(oldTaints, newTaints) || !reflect.DeepEqual(oldNode.Spec.Taints, newNode.Spec.Taints) { invalidPredicates.Insert(predicates.PodToleratesNodeTaintsPred) } if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) { oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus) newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus) for _, cond := range oldNode.Status.Conditions { oldConditions[cond.Type] = cond.Status } for _, cond := range newNode.Status.Conditions { newConditions[cond.Type] = cond.Status } if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] { invalidPredicates.Insert(predicates.CheckNodeMemoryPressurePred) } if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] { invalidPredicates.Insert(predicates.CheckNodeDiskPressurePred) } if oldConditions[v1.NodeReady] != newConditions[v1.NodeReady] || oldConditions[v1.NodeOutOfDisk] != newConditions[v1.NodeOutOfDisk] || oldConditions[v1.NodeNetworkUnavailable] != newConditions[v1.NodeNetworkUnavailable] { invalidPredicates.Insert(predicates.CheckNodeConditionPred) } } if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable { invalidPredicates.Insert(predicates.CheckNodeConditionPred) } c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates) } }
因此,node update时,会删除该node对应的Equivalence Cache中如下PredicateKey的PredicateCache: GeneralPredicates, 前提:node.Status.Allocatable或node labels发生变更. ServiceAffinity, 前提:node labels发生变更。 MatchInterPodAffinity, 前提:node labels发生变更。 NoVolumeZoneConflict, 前提:failure-domain.beta.kubernetes.io/zone或failure-domain.beta.kubernetes.io/region Annotation发生变更; PodToleratesNodeTaints, 前提: Node的Taints(对应scheduler.alpha.kubernetes.io/taints Annotation)发生变更. CheckNodeMemoryPressure, CheckNodeDiskPressure, CheckNodeCondition, 前提:如果对应的Node Condition发生变更。
Delete Node
当发生node delete event时,对应会调用InvalidateAllCachedPredicateItemOfNode更新Equivalence Cache。 // InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) { ec.Lock() defer ec.Unlock() delete(ec.algorithmCache, nodeName) glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) }
因此,node delete时,则会从Equivalence Cache中删除整个node对应的algorthmCache。
Add or Delete PV
当发生pv add或者delete event时,对应会调用invalidatePredicatesForPv更新Equivalence Cache。 func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { // You could have a PVC that points to a PV, but the PV object doesn't exist. // So when the PV object gets added, we can recount. invalidPredicates := sets.NewString() // PV types which impact MaxPDVolumeCountPredicate if pv.Spec.AWSElasticBlockStore != nil { invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred) } if pv.Spec.GCEPersistentDisk != nil { invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred) } if pv.Spec.AzureDisk != nil { invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred) } // If PV contains zone related label, it may impact cached NoVolumeZoneConflict for k := range pv.Labels { if isZoneRegionLabel(k) { invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) break } } if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { // Add/delete impacts the available PVs to choose from invalidPredicates.Insert(predicates.CheckVolumeBindingPred) } c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) }
因此,当add或者delete PV时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache: MaxEBSVolumeCount, MaxGCEPDVolumeCount, MaxAzureDiskVolumeCount,前提:PV类型是这三者的范围内;
Update PV
当发生pv update event时,对应会调用invalidatePredicatesForPvUpdate更新Equivalence Cache。 func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) { invalidPredicates := sets.NewString() for k, v := range newPV.Labels { // If PV update modifies the zone/region labels. if isZoneRegionLabel(k) && !reflect.DeepEqual(v, oldPV.Labels[k]) { invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) break } } c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) }
因此,当update PV时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache: NoVolumeZoneConflict, 前提:PV的failure-domain.beta.kubernetes.io/zone或failure-domain.beta.kubernetes.io/region Annotation发生变更;
Add or Delete PVC func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) { // We need to do this here because the ecache uses PVC uid as part of equivalence hash of pod // The bound volume type may change invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...) // The bound volume's label may change invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { // Add/delete impacts the available PVs to choose from invalidPredicates.Insert(predicates.CheckVolumeBindingPred) } c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) }
当发生pvc add或者delete event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache: "MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount" PredicateCaches; NoVolumeZoneConflict PredicateCaches; CheckVolumeBinding,前提,VolumeScheduling这个Feature Gate是启用状态;
Update PVC func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) { invalidPredicates := sets.NewString() if old.Spec.VolumeName != new.Spec.VolumeName { if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { // PVC volume binding has changed invalidPredicates.Insert(predicates.CheckVolumeBindingPred) } // The bound volume type may change invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...) } c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) }
当发生pvc update event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache: CheckVolumeBinding,前提:VolumeScheduling这个Feature Gate是启用状态,并且PVC对应的PV发生变更; "MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount" PredicateCaches,前提:PVC对应的PV发生变更;
Add or Delete Service func (c *configFactory) onServiceAdd(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) onServiceDelete(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } c.podQueue.MoveAllToActiveQueue() }
当发生Service Add或Delete event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache: CheckServiceAffinity;
Update Service func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) { if c.enableEquivalenceClassCache { // TODO(resouer) We may need to invalidate this for specified group of pods only oldService := oldObj.(*v1.Service) newService := newObj.(*v1.Service) if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } } c.podQueue.MoveAllToActiveQueue() }
当发生Service Update event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache: CheckServiceAffinity,前提:Service的Selector发生变更。
Equivalence Class的不足 Equivalence Class Feature最困难的就是如何最优的维护和更新Equivalence Cache,做到每次更新都是最小粒度的、准确无误的,目前这方面还需优化。 Equivalence Cache只缓存Predicate Result,并不支持Priority Result数据的缓存和维护(社区正在实现基于Map-Reduce方式优化),通常情况下,Priority Funcs的处理逻辑要比Predicate Funcs复杂,支持的意义就更大。 Equivalence Class目前只能根据Pod对应的OwnerReference和PVC信息进行Equivalence Hash,如果能摒弃OwnerReference的考虑,充分考虑Pod spec中那些核心的field,比如resource request, Labels,Affinity等,缓存命中的几率可能会大的多,Predicate的性能就能得到更显著的提升。
总结
Equivalence Class是用来给Kubernetes Scheduler加速Predicate,从而提升Scheduler的吞吐性能。当然,普通用户其实无需关注Equivalence Class Feature,因为目前的scheduler性能对大部分用户来说已经足够了,但对于有大规模AI训练场景的用户,可以多关注它。
热门排行