// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. // The interface follows a pattern similar to cache.FIFO and cache.Heap and // makes it easy to use those data structures as a SchedulingQueue. type SchedulingQueue interface { framework.PodNominator Add(pod *v1.Pod) error // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. // The passed-in pods are originally compiled from plugins that want to activate Pods, // by injecting the pods through a reserved CycleState struct (PodsToActivate). Activate(pods map[string]*v1.Pod) // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. // The podSchedulingCycle represents the current scheduling cycle number which can be // returned by calling SchedulingCycle(). AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error // SchedulingCycle returns the current number of scheduling cycle which is // cached by scheduling queue. Normally, incrementing this number whenever // a pod is popped (e.g. called Pop()) is enough. SchedulingCycle() int64 // Pop removes the head of the queue and returns it. It blocks if the // queue is empty and waits until a new item is added to the queue. Pop() (*framework.QueuedPodInfo, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) PendingPods() ([]*v1.Pod, string) // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() // Run starts the goroutines managing the queue. Run() }
// PriorityQueue implements a scheduling queue. // The head of PriorityQueue is the highest priority pending pod. This structure // has two sub queues and a additional data structure, namely: activeQ, // backoffQ and unschedulablePods. // - activeQ holds pods that are being considered for scheduling. // - backoffQ holds pods that moved from unschedulablePods and will move to // activeQ when their backoff periods complete. // - unschedulablePods holds pods that were already attempted for scheduling and // are currently determined to be unschedulable. type PriorityQueue struct { *nominator
stop chanstruct{} clock clock.Clock
// pod initial backoff duration. podInitialBackoffDuration time.Duration // pod maximum backoff duration. podMaxBackoffDuration time.Duration // the maximum time a pod can stay in the unschedulablePods. podMaxInUnschedulablePodsDuration time.Duration
cond sync.Cond
// activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. activeQ *heap.Heap // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff // are popped from this heap before the scheduler looks at activeQ podBackoffQ *heap.Heap // unschedulablePods holds pods that have been tried and determined unschedulable. unschedulablePods *UnschedulablePods // schedulingCycle represents sequence number of scheduling cycle and is incremented // when a pod is popped. schedulingCycle int64 // moveRequestCycle caches the sequence number of scheduling cycle when we // received a move request. Unschedulable pods in and before this scheduling // cycle will be put back to activeQueue if we were trying to schedule them // when we received move request. moveRequestCycle int64
clusterEventMap map[framework.ClusterEvent]sets.String // preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins. preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
// closed indicates that the queue is closed. // It is mainly used to let Pop() exit its control loop while waiting for an item. closed bool
nsLister listersv1.NamespaceLister
metricsRecorder metrics.MetricAsyncRecorder // pluginMetricsSamplePercent is the percentage of plugin metrics to be sampled. pluginMetricsSamplePercent int }
// QueuedPodInfo is a Pod wrapper with additional information related to // the pod's status in the scheduling queue, such as the timestamp when // it's added to the queue. type QueuedPodInfo struct { *PodInfo // The time pod added to the scheduling queue. Timestamp time.Time // Number of schedule attempts before successfully scheduled. // It's used to record the # attempts metric. Attempts int // The time when the pod is added to the queue for the first time. The pod may be added // back to the queue multiple times before it's successfully scheduled. // It shouldn't be updated once initialized. It's used to record the e2e scheduling // latency for a pod. InitialAttemptTimestamp time.Time // If a Pod failed in a scheduling cycle, record the plugin names it failed by. UnschedulablePlugins sets.String // Whether the Pod is scheduling gated (by PreEnqueuePlugins) or not. Gated bool }
PodInfo的定义在pkg/scheduler/framework/types.go:131
1 2 3 4 5 6 7 8 9 10
// PodInfo is a wrapper to a Pod with additional pre-computed information to // accelerate processing. This information is typically immutable (e.g., pre-processed // inter-pod affinity selectors). type PodInfo struct { Pod *v1.Pod RequiredAffinityTerms []AffinityTerm RequiredAntiAffinityTerms []AffinityTerm PreferredAffinityTerms []WeightedAffinityTerm PreferredAntiAffinityTerms []WeightedAffinityTerm }
// Pod is a collection of containers that can run on a host. This resource is created // by clients and scheduled onto hosts. type Pod struct { metav1.TypeMeta `json:",inline"` // Standard object's metadata. // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata // +optional metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Specification of the desired behavior of the pod. // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status // +optional Spec PodSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
// Most recently observed status of the pod. // This data may not be up to date. // Populated by the system. // Read-only. // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status // +optional Status PodStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` }
// Heap is a producer/consumer queue that implements a heap data structure. // It can be used to implement priority queues and similar data structures. type Heap struct { // data stores objects and has a queue that keeps their ordering according // to the heap invariant. data *data // metricRecorder updates the counter when elements of a heap get added or // removed, and it does nothing if it's nil metricRecorder metrics.MetricRecorder } // data is an internal struct that implements the standard heap interface // and keeps the data stored in the heap. type data struct { // items is a map from key of the objects to the objects and their index. // We depend on the property that items in the map are in the queue and vice versa. items map[string]*heapItem // queue implements a heap data structure and keeps the order of elements // according to the heap invariant. The queue keeps the keys of objects stored // in "items". queue []string
// keyFunc is used to make the key used for queued item insertion and retrieval, and // should be deterministic. keyFunc KeyFunc // lessFunc is used to compare two objects in the heap. lessFunc lessFunc }
// Less is the function used by the activeQ heap algorithm to sort pods. // It sorts pods based on their priority. When priorities are equal, it uses // PodQueueInfo.timestamp. func(pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { p1 := corev1helpers.PodPriority(pInfo1.Pod) p2 := corev1helpers.PodPriority(pInfo2.Pod) return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp)) }
// UnschedulablePods holds pods that cannot be scheduled. This data structure // is used to implement unschedulablePods. type UnschedulablePods struct { // podInfoMap is a map key by a pod's full-name and the value is a pointer to the QueuedPodInfo. podInfoMap map[string]*framework.QueuedPodInfo keyFunc func(*v1.Pod)string // unschedulableRecorder/gatedRecorder updates the counter when elements of an unschedulablePodsMap // get added or removed, and it does nothing if it's nil. unschedulableRecorder, gatedRecorder metrics.MetricRecorder }
// calculateBackoffDuration is a helper function for calculating the backoffDuration // based on the number of attempts the pod has made. func(p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration { duration := p.podInitialBackoffDuration for i := 1; i < podInfo.Attempts; i++ { // Use subtraction instead of addition or multiplication to avoid overflow. if duration > p.podMaxBackoffDuration-duration { return p.podMaxBackoffDuration } duration += duration } return duration }
// Pop removes the head of the active queue and returns it. It blocks if the // activeQ is empty and waits until a new item is added to the queue. It // increments scheduling cycle when a pod is popped. func(p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) { p.lock.Lock() defer p.lock.Unlock() for p.activeQ.Len() == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the p.closed is set and the condition is broadcast, // which causes this loop to continue and return from the Pop(). if p.closed { returnnil, fmt.Errorf(queueClosed) } p.cond.Wait() } obj, err := p.activeQ.Pop() if err != nil { returnnil, err } pInfo := obj.(*framework.QueuedPodInfo) pInfo.Attempts++ p.schedulingCycle++ return pInfo, nil }
// Add adds a pod to the active queue. It should be called only when a new pod // is added so there is no chance the pod is already in active/unschedulable/backoff queues func(p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock()
pInfo := p.newQueuedPodInfo(pod) gated := pInfo.Gated if added, err := p.addToActiveQ(pInfo); !added { return err } if p.unschedulablePods.get(pod) != nil { klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod)) p.unschedulablePods.delete(pod, gated) } // Delete pod from backoffQ if it is backing off if err := p.podBackoffQ.Delete(pInfo); err == nil { klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod)) } klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", PodAdd, "queue", activeQName) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() p.addNominatedPodUnlocked(pInfo.PodInfo, nil) p.cond.Broadcast()
// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into // the queue, unless it is already in the queue. Normally, PriorityQueue puts // unschedulable pods in `unschedulablePods`. But if there has been a recent move // request, then the pod is put in `podBackoffQ`. func(p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { p.lock.Lock() defer p.lock.Unlock() pod := pInfo.Pod if p.unschedulablePods.get(pod) != nil { return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod)) }
if _, exists, _ := p.activeQ.Get(pInfo); exists { return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod)) } if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod)) }
// Refresh the timestamp since the pod is re-added. pInfo.Timestamp = p.clock.Now()
// If a move request has been received, move it to the BackoffQ, otherwise move // it to unschedulablePods. for plugin := range pInfo.UnschedulablePlugins { metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc() } if p.moveRequestCycle >= podSchedulingCycle { if err := p.podBackoffQ.Add(pInfo); err != nil { return fmt.Errorf("error adding pod %v to the backoff queue: %v", klog.KObj(pod), err) } klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", backoffQName) metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc() } else { p.unschedulablePods.addOrUpdate(pInfo) klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", unschedulablePods) metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
// Run starts the goroutine to pump from podBackoffQ to activeQ func(p *PriorityQueue) Run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop) }
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ func(p *PriorityQueue) flushBackoffQCompleted() { p.lock.Lock() defer p.lock.Unlock() activated := false for { rawPodInfo := p.podBackoffQ.Peek() if rawPodInfo == nil { break } pInfo := rawPodInfo.(*framework.QueuedPodInfo) pod := pInfo.Pod if p.isPodBackingoff(pInfo) { break } _, err := p.podBackoffQ.Pop() if err != nil { klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) break } if err := p.activeQ.Add(pInfo); err != nil { klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod)) } else { klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() activated = true } }
// flushUnschedulablePodsLeftover moves pods which stay in unschedulablePods // longer than podMaxInUnschedulablePodsDuration to backoffQ or activeQ. func(p *PriorityQueue) flushUnschedulablePodsLeftover() { p.lock.Lock() defer p.lock.Unlock()
var podsToMove []*framework.QueuedPodInfo currentTime := p.clock.Now() for _, pInfo := range p.unschedulablePods.podInfoMap { lastScheduleTime := pInfo.Timestamp if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration { podsToMove = append(podsToMove, pInfo) } }
// NOTE: this function assumes lock has been acquired in caller func(p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) { activated := false for _, pInfo := range podInfoList { // If the event doesn't help making the Pod schedulable, continue. // Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes // either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit. // In that case, it's desired to move it anyways. iflen(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) { continue } pod := pInfo.Pod if p.isPodBackingoff(pInfo) { if err := p.podBackoffQ.Add(pInfo); err != nil { klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod)) } else { klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQName) metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc() p.unschedulablePods.delete(pod, pInfo.Gated) } } else { gated := pInfo.Gated if added, _ := p.addToActiveQ(pInfo); added { klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQName) activated = true metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc() p.unschedulablePods.delete(pod, gated) } } } p.moveRequestCycle = p.schedulingCycle if activated { p.cond.Broadcast() } }