Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/kubelet/cm/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ type ContainerManager interface {
// due to node recreation.
ShouldResetExtendedResourceCapacity() bool

// GetAllocateResourcesPodAdmitHandler returns an instance of a PodAdmitHandler responsible for allocating pod resources.
GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler
// GetPodAdmitHandler returns an instance of a PodAdmitHandler responcible for checking
// if pod can be admitted
GetPodAdmitHandler() lifecycle.PodAdmitHandler

// GetNodeAllocatableAbsolute returns the absolute value of Node Allocatable which is primarily useful for enforcement.
GetNodeAllocatableAbsolute() v1.ResourceList
Expand Down
61 changes: 38 additions & 23 deletions pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,50 +686,65 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.N
return cm.deviceManager.UpdatePluginResources(node, attrs)
}

func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
return cm.topologyManager
}
func (cm *containerManagerImpl) GetPodAdmitHandler() lifecycle.PodAdmitHandler {
// TODO: we need to think about a better way to do this. This will work for
// now so long as we have only the cpuManager and deviceManager relying on
// allocations here. However, going forward it is not generalized enough to
// work as we add more and more hint providers that the TopologyManager
// needs to call Allocate() on (that may not be directly intstantiated
// inside this component).
return &resourceAllocator{cm.cpuManager, cm.memoryManager, cm.deviceManager, cm.draManager}
return &podAdmitHandler{cm.topologyManager, cm.cpuManager, cm.memoryManager, cm.deviceManager, cm.draManager}
}

type resourceAllocator struct {
cpuManager cpumanager.Manager
memoryManager memorymanager.Manager
deviceManager devicemanager.Manager
draManager dra.Manager
type podAdmitHandler struct {
topologyManager topologymanager.Manager
cpuManager cpumanager.Manager
memoryManager memorymanager.Manager
deviceManager devicemanager.Manager
draManager dra.Manager
}

func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
func (m *podAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
pod := attrs.Pod

for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err := m.deviceManager.Allocate(pod, &container)
if err != nil {
return admission.GetPodAdmitResult(err)
// If topology manager is enabled it works as an admit handler for all its hint providers:
// device manager, cpu manager and memory manager
// It doesn't call draManager.Admit as DRA manager is not a hint provider,
// so it should be called separately (see below)
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
if result := m.topologyManager.Admit(attrs); !result.Admit {
return result
}

if m.cpuManager != nil {
err = m.cpuManager.Allocate(pod, &container)
} else {
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err := m.deviceManager.Allocate(pod, &container)
if err != nil {
return admission.GetPodAdmitResult(err)
}
}

if m.memoryManager != nil {
err = m.memoryManager.Allocate(pod, &container)
if err != nil {
return admission.GetPodAdmitResult(err)
if m.cpuManager != nil {
err = m.cpuManager.Allocate(pod, &container)
if err != nil {
return admission.GetPodAdmitResult(err)
}
}

if m.memoryManager != nil {
err = m.memoryManager.Allocate(pod, &container)
if err != nil {
return admission.GetPodAdmitResult(err)
}
}
}
}

// draManager.Admit has to be called independenty as DRA Manager
// is not a topology hint provider and topology manager does not
// handle admissions for it
if m.draManager != nil {
return admission.GetPodAdmitResult(m.draManager.Admit(pod))
}

return admission.GetPodAdmitResult(nil)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/container_manager_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool {
return cm.shouldResetExtendedResourceCapacity
}

func (cm *containerManagerStub) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
func (cm *containerManagerStub) GetPodAdmitHandler() lifecycle.PodAdmitHandler {
return topologymanager.NewFakeManager()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/container_manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return cm.deviceManager.ShouldResetExtendedResourceCapacity()
}

func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
func (cm *containerManagerImpl) GetPodAdmitHandler() lifecycle.PodAdmitHandler {
return &noopWindowsResourceAllocator{}
}

Expand Down
25 changes: 25 additions & 0 deletions pkg/kubelet/cm/dra/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,31 @@ func NewManagerImpl(kubeClient clientset.Interface) (*ManagerImpl, error) {
return manager, nil
}

// Admit checks if pod is in the list of users for the claim
func (m *ManagerImpl) Admit(pod *v1.Pod) error {
for _, podResourceClaim := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &podResourceClaim)
klog.V(3).InfoS("Check if pod can be admited", "pod", pod.Name, "claim", claimName)

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: admited -> admitted

// Query claim object from the API server
resourceClaim, err := m.kubeClient.CoreV1().ResourceClaims(pod.Namespace).Get(context.TODO(), claimName, metav1.GetOptions{})
Comment thread
bart0sh marked this conversation as resolved.
if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
}
// Check if pod is in the ReservedFor for the claim
found := false
for _, user := range resourceClaim.Status.ReservedFor {
if user.UID == pod.UID {
found = true
break
}
}
if !found {
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)", pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID)
}
}
return nil
}

// Generate container annotations using CDI UpdateAnnotations API
func generateCDIAnnotation(claimUID types.UID, driverName string, cdiDevices []string) ([]kubecontainer.Annotation, error) {
const maxKeyLen = 63 // max length of the CDI annotation key
Expand Down
3 changes: 3 additions & 0 deletions pkg/kubelet/cm/dra/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (

// Manager manages all the DRA resource plugins running on a node.
type Manager interface {
// Admit checks if a pod can be admitted
Admit(pod *v1.Pod) error

// PrepareResources prepares resources for a container in a pod.
// It communicates with the DRA resource plugin to prepare resources and
// returns resource info to trigger CDI injection on the runtime side.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/fake_container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (cm *FakeContainerManager) ShouldResetExtendedResourceCapacity() bool {
return cm.shouldResetExtendedResourceCapacity
}

func (cm *FakeContainerManager) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
func (cm *FakeContainerManager) GetPodAdmitHandler() lifecycle.PodAdmitHandler {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetAllocateResourcesPodAdmitHandler")
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)

klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetPodAdmitHandler())

criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
Expand Down