From 3bcae1492fa8a82c749cab94727736a1f228c0d9 Mon Sep 17 00:00:00 2001 From: Marlow Warnicke Date: Tue, 17 Mar 2026 10:58:22 -0500 Subject: [PATCH 1/4] Chore: Makefile changes for running tests (with-kind, skip image build if exists, kind-load-test-image) --- Makefile | 41 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index f578902..500daa6 100644 --- a/Makefile +++ b/Makefile @@ -50,6 +50,22 @@ build-dracpu: ## build dracpu clean: ## clean rm -rf "$(OUT_DIR)/" +with-kind: ## run a command with a temporary kind cluster (create if missing, delete on exit if created) + @if [ -z "$$CMD" ]; then \ + echo "CMD is required. Example: CMD='echo hello' $(MAKE) with-kind"; \ + exit 1; \ + fi; \ + created=0; \ + while read -r name; do \ + if [ "$$name" = "$(CLUSTER_NAME)" ]; then created=2; fi; \ + done < <(kind get clusters 2>/dev/null || true); \ + if [ "$$created" -eq 0 ]; then \ + kind create cluster --name ${CLUSTER_NAME} --config hack/kind.yaml; \ + created=1; \ + fi; \ + trap 'if [ "$$created" -eq 1 ]; then kind delete cluster --name ${CLUSTER_NAME}; fi' EXIT; \ + bash -c "$$CMD" + test-unit: ## run tests CGO_ENABLED=1 go test -v -race -count 1 -coverprofile=coverage.out ./pkg/... @@ -96,7 +112,16 @@ export DOCKER_CLI_EXPERIMENTAL=enabled image: ## docker build load docker build . -t ${STAGING_IMAGE_NAME} --load -build-image: ## build image +build-image: ## build image (skip if already exists unless FORCE_BUILD=1) + @if [ "$(FORCE_BUILD)" = "1" ]; then \ + $(MAKE) build-image-force; \ + elif docker image inspect "${IMAGE}" >/dev/null 2>&1; then \ + echo "Image ${IMAGE} already exists; skipping build."; \ + else \ + $(MAKE) build-image-force; \ + fi + +build-image-force: ## force build image docker buildx build . \ --platform="${PLATFORMS}" \ --tag="${IMAGE}" \ @@ -116,6 +141,9 @@ kind-cluster: ## create kind cluster kind-load-image: build-image ## load the current container image into kind kind load docker-image ${IMAGE} ${IMAGE_LATEST} --name ${CLUSTER_NAME} +kind-load-test-image: build-test-image ## load the test image into kind + kind load docker-image ${IMAGE_TEST} --name ${CLUSTER_NAME} + kind-uninstall-cpu-dra: ## remove cpu dra from kind cluster kubectl delete -f install.yaml || true @@ -162,7 +190,16 @@ ifneq ($(DRACPU_E2E_VERBOSE),) endif $(call kind_setup,$(CI_MANIFEST_FILE)) -build-test-image: ## build tests image +build-test-image: ## build tests image (skip if already exists unless FORCE_BUILD=1) + @if [ "$(FORCE_BUILD)" = "1" ]; then \ + $(MAKE) build-test-image-force; \ + elif docker image inspect "${IMAGE_TEST}" >/dev/null 2>&1; then \ + echo "Image ${IMAGE_TEST} already exists; skipping build."; \ + else \ + $(MAKE) build-test-image-force; \ + fi + +build-test-image-force: ## force build tests image docker buildx build . \ --file test/image/Dockerfile \ --platform="${PLATFORMS}" \ From 67c9c8cdd31ea3dfbb4a0b5632034ab3733a4bfb Mon Sep 17 00:00:00 2001 From: Marlow Warnicke Date: Wed, 18 Mar 2026 16:48:49 -0500 Subject: [PATCH 2/4] QoL improvements re Makefile and updates according to breaking things with those edits - cpu_assignment: track exclusive CPU allocations per node so same ID on different nodes are not reported as overlapping derive expected shared pool from target node only increase verifySharedPoolMatches timeout and improve failure msg - e2e_suite: fix BeFailedToCreate to log State.Waiting.Reason instead of State.Terminated.Reason when container is Waiting. - pod: on WaitToBeRunning failure, append pod events hint (type, reason, message, time) for debugging Pending pods. Makefile: - test-e2e: use single cluster (create if missing, run grouped then individual, delete if we created) --- Makefile | 29 +++++++++++++++++++++-- test/e2e/cpu_assignment_test.go | 37 +++++++++++++++++++++-------- test/e2e/e2e_suite_test.go | 2 +- test/e2e/sharing_test.go | 8 +++---- test/pkg/pod/pod.go | 41 ++++++++++++++++++++++++++++++++- 5 files changed, 99 insertions(+), 18 deletions(-) diff --git a/Makefile b/Makefile index 500daa6..7a26eb1 100644 --- a/Makefile +++ b/Makefile @@ -212,11 +212,36 @@ build-test-dracputester: ## build helper to serve as entry point and report cpu build-test-dracpuinfo: ## build helper to expose hardware info in the internal dracpu format go build -v -o "$(OUT_DIR)/dracpuinfo" ./test/image/dracpuinfo -test-e2e: ## run e2e test against an existing configured cluster - env DRACPU_E2E_TEST_IMAGE=$(IMAGE_TEST) DRACPU_E2E_RESERVED_CPUS=$(DRACPU_E2E_RESERVED_CPUS) go test -v ./test/e2e/ --ginkgo.v +# E2E: shared setup and run targets (assume cluster exists; use with-kind for "create if missing, delete if we created"). +define e2e_daemonset_patch +[{"op":"replace","path":"/spec/template/spec/containers/0/args","value":["/dracpu","--v=4","--cpu-device-mode=$(1)","--reserved-cpus=$(DRACPU_E2E_RESERVED_CPUS)"]}] +endef + +kind-e2e-setup: ## label workers, load test image, install DRA, wait for rollout (cluster must exist) + kubectl label nodes -l '!node-role.kubernetes.io/control-plane' node-role.kubernetes.io/worker='' --overwrite + $(MAKE) kind-load-test-image kind-install-cpu-dra + kubectl -n kube-system rollout status daemonset/dracpu --timeout=120s + +test-e2e-grouped-run: ## patch daemonset to grouped and run e2e (requires kind-e2e-setup) + kubectl -n kube-system patch daemonset dracpu --type=json -p='$(call e2e_daemonset_patch,grouped)' + env DRACPU_E2E_TEST_IMAGE=$(IMAGE_TEST) DRACPU_E2E_RESERVED_CPUS=$(DRACPU_E2E_RESERVED_CPUS) DRACPU_E2E_CPU_DEVICE_MODE=grouped go test -v ./test/e2e/ --ginkgo.v + +test-e2e-individual-run: ## patch daemonset to individual and run e2e (requires kind-e2e-setup) + kubectl -n kube-system patch daemonset dracpu --type=json -p='$(call e2e_daemonset_patch,individual)' + kubectl -n kube-system rollout status daemonset/dracpu --timeout=120s + env DRACPU_E2E_TEST_IMAGE=$(IMAGE_TEST) DRACPU_E2E_RESERVED_CPUS=$(DRACPU_E2E_RESERVED_CPUS) DRACPU_E2E_CPU_DEVICE_MODE=individual go test -v ./test/e2e/ --ginkgo.v + +test-e2e: ## run e2e in both grouped and individual mode (one cluster: create if missing, run both, delete if we created) + CMD='$(MAKE) kind-e2e-setup; $(MAKE) test-e2e-grouped-run; r1=$$?; $(MAKE) test-e2e-individual-run; r2=$$?; exit $$((r1|r2))' $(MAKE) with-kind test-e2e-kind: ci-kind-setup test-e2e ## run e2e test against a purpose-built kind cluster +test-e2e-grouped-mode: ## run e2e tests in grouped mode (with-kind) + CMD='$(MAKE) kind-e2e-setup test-e2e-grouped-run' $(MAKE) with-kind + +test-e2e-individual-mode: ## run e2e tests in individual mode (with-kind) + CMD='$(MAKE) kind-e2e-setup test-e2e-individual-run' $(MAKE) with-kind + lint: ## run the linter against the codebase $(GOLANGCI_LINT) run ./... diff --git a/test/e2e/cpu_assignment_test.go b/test/e2e/cpu_assignment_test.go index cfa3b42..bb5c37d 100644 --- a/test/e2e/cpu_assignment_test.go +++ b/test/e2e/cpu_assignment_test.go @@ -184,7 +184,8 @@ var _ = ginkgo.Describe("CPU Allocation", ginkgo.Serial, ginkgo.Ordered, ginkgo. fxt.Log.Info("Creating pods requesting exclusive CPUs", "numPods", numPods, "cpusPerClaim", cpusPerClaim) var exclPods []*v1.Pod - allAllocatedCPUs := cpuset.New() + // CPU IDs are per-node; track allocations per node so we don't treat same IDs on different nodes as overlapping. + allAllocatedCPUsByNode := make(map[string]cpuset.CPUSet) claimTemplate := resourcev1.ResourceClaimTemplate{ ObjectMeta: metav1.ObjectMeta{ @@ -205,18 +206,34 @@ var _ = ginkgo.Describe("CPU Allocation", ginkgo.Serial, ginkgo.Ordered, ginkgo. fixture.By("Verifying CPU allocations for each exclusive pod") for i, pod := range exclPods { - alloc := getTesterPodCPUAllocation(fxt.K8SClientset, ctx, pod) + var alloc CPUAllocation + nodeName := pod.Spec.NodeName + gomega.Eventually(func() error { + alloc = getTesterPodCPUAllocation(fxt.K8SClientset, ctx, pod) + if alloc.CPUAssigned.Size() != cpusPerClaim { + return fmt.Errorf("pod %d: got %d CPUs, want %d", i, alloc.CPUAssigned.Size(), cpusPerClaim) + } + if !alloc.CPUAssigned.IsSubsetOf(availableCPUs) { + return fmt.Errorf("pod %d: CPUs %s outside available set %s", i, alloc.CPUAssigned.String(), availableCPUs.String()) + } + allocatedOnNode := allAllocatedCPUsByNode[nodeName] + if allocatedOnNode.Intersection(alloc.CPUAssigned).Size() != 0 { + return fmt.Errorf("pod %d: overlapping CPUs with previous pods on node %s", i, nodeName) + } + return nil + }).WithTimeout(2*time.Minute).WithPolling(5*time.Second).Should(gomega.Succeed(), "exclusive pod %d allocation did not stabilize", i) fxt.Log.Info("Checking exclusive CPU allocation", "pod", e2epod.Identify(pod), "cpuAllocated", alloc.CPUAssigned.String()) - gomega.Expect(alloc.CPUAssigned.Size()).To(gomega.Equal(cpusPerClaim), "Pod %d did not get %d CPUs", i, cpusPerClaim) - gomega.Expect(alloc.CPUAssigned.IsSubsetOf(availableCPUs)).To(gomega.BeTrue(), "Pod %d got CPUs outside available set", i) - gomega.Expect(allAllocatedCPUs.Intersection(alloc.CPUAssigned).Size()).To(gomega.Equal(0), "Pod %d has overlapping CPUs", i) - allAllocatedCPUs = allAllocatedCPUs.Union(alloc.CPUAssigned) + allAllocatedCPUsByNode[nodeName] = allAllocatedCPUsByNode[nodeName].Union(alloc.CPUAssigned) } - gomega.Expect(allAllocatedCPUs.Size()).To(gomega.Equal(numPods * cpusPerClaim)) - rootFxt.Log.Info("All exclusive allocation", "pod", "exclusive CPUs", allAllocatedCPUs.String(), "expected Shared CPUs", availableCPUs.Difference(allAllocatedCPUs).String()) + var totalAllocated int + for _, cpus := range allAllocatedCPUsByNode { + totalAllocated += cpus.Size() + } + gomega.Expect(totalAllocated).To(gomega.Equal(numPods * cpusPerClaim)) + rootFxt.Log.Info("All exclusive allocation", "byNode", allAllocatedCPUsByNode, "expected Shared CPUs on target", availableCPUs.Difference(allAllocatedCPUsByNode[targetNode.Name]).String()) fixture.By("checking the shared pool does not include anymore the exclusively allocated CPUs") - expectedSharedCPUs := availableCPUs.Difference(allAllocatedCPUs) + expectedSharedCPUs := availableCPUs.Difference(allAllocatedCPUsByNode[targetNode.Name]) fixture.By("creating a second best-effort reference pod") shrPod2 := mustCreateBestEffortPod(ctx, fxt, targetNode.Name, dracpuTesterImage) @@ -248,5 +265,5 @@ func verifySharedPoolMatches(ctx context.Context, fxt *fixture.Fixture, sharedPo return fmt.Errorf("shared CPUs mismatch: expected %v got %v", expectedSharedCPUs.String(), sharedAllocUpdated.CPUAssigned.String()) } return nil - }).WithTimeout(1*time.Minute).WithPolling(5*time.Second).Should(gomega.Succeed(), "the best-effort tester pod %s does not have access to the exclusively allocated CPUs", e2epod.Identify(sharedPod)) + }).WithTimeout(2*time.Minute).WithPolling(5*time.Second).Should(gomega.Succeed(), "the best-effort tester pod %s CPU pool did not match expected %s", e2epod.Identify(sharedPod), expectedSharedCPUs.String()) } diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index d283450..981d45e 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -66,7 +66,7 @@ func BeFailedToCreate(fxt *fixture.Fixture) types.GomegaMatcher { return false, nil } if cntSt.State.Waiting.Reason != reasonCreateContainerError { - lh.Info("container terminated for different reason", "containerName", cntSt.Name, "reason", cntSt.State.Terminated.Reason) + lh.Info("container waiting for different reason", "containerName", cntSt.Name, "reason", cntSt.State.Waiting.Reason) return false, nil } lh.Info("container creation error", "containerName", cntSt.Name) diff --git a/test/e2e/sharing_test.go b/test/e2e/sharing_test.go index 6b4b85e..878ffdb 100644 --- a/test/e2e/sharing_test.go +++ b/test/e2e/sharing_test.go @@ -147,7 +147,7 @@ var _ = ginkgo.Describe("Claim sharing", ginkgo.Serial, ginkgo.Ordered, ginkgo.C gomega.Expect(fxt.Teardown(ctx)).To(gomega.Succeed()) }) - ginkgo.It("should fail to run pods which share a claim", func(ctx context.Context) { + ginkgo.It("should fail to run pods which share a claim", ginkgo.SpecTimeout(5*time.Minute), func(ctx context.Context) { testPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: fxt.Namespace.Name, @@ -196,10 +196,10 @@ var _ = ginkgo.Describe("Claim sharing", ginkgo.Serial, ginkgo.Ordered, ginkgo.C return nil } return pod - }).WithTimeout(time.Minute).WithPolling(2 * time.Second).Should(BeFailedToCreate(fxt)) + }).WithTimeout(2 * time.Minute).WithPolling(2 * time.Second).Should(BeFailedToCreate(fxt)) }) - ginkgo.It("should fail to run a pod with multiple containers which share a claim", ginkgo.Label("negative"), func(ctx context.Context) { + ginkgo.It("should fail to run a pod with multiple containers which share a claim", ginkgo.Label("negative"), ginkgo.SpecTimeout(5*time.Minute), func(ctx context.Context) { testPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: fxt.Namespace.Name, @@ -259,7 +259,7 @@ var _ = ginkgo.Describe("Claim sharing", ginkgo.Serial, ginkgo.Ordered, ginkgo.C return nil } return pod - }).WithTimeout(time.Minute).WithPolling(2 * time.Second).Should(BeFailedToCreate(fxt)) + }).WithTimeout(2 * time.Minute).WithPolling(2 * time.Second).Should(BeFailedToCreate(fxt)) }) }) }) diff --git a/test/pkg/pod/pod.go b/test/pkg/pod/pod.go index f6c6f9f..a990297 100644 --- a/test/pkg/pod/pod.go +++ b/test/pkg/pod/pod.go @@ -75,11 +75,50 @@ func RunToCompletion(ctx context.Context, cs kubernetes.Interface, pod *v1.Pod) func WaitToBeRunning(ctx context.Context, cs kubernetes.Interface, podNamespace, podName string) error { phase, err := WaitForPhase(ctx, cs, podNamespace, podName, v1.PodRunning) if err != nil { - return fmt.Errorf("pod=%s/%s is not Running; phase=%q; %w", podNamespace, podName, phase, err) + hint := formatPodEventsHint(ctx, cs, podNamespace, podName) + return fmt.Errorf("pod=%s/%s is not Running; phase=%q; %w%s", podNamespace, podName, phase, err, hint) } return nil } +// formatPodEventsHint returns a short hint (pod events, newest first) for debugging Pending pods. Safe to call on failure. +func formatPodEventsHint(ctx context.Context, cs kubernetes.Interface, podNamespace, podName string) string { + eventList, err := cs.CoreV1().Events(podNamespace).List(ctx, metav1.ListOptions{ + FieldSelector: "involvedObject.name=" + podName, + Limit: 15, + }) + if err != nil || len(eventList.Items) == 0 { + return "" + } + var b strings.Builder + b.WriteString("; pod events (newest first): ") + const maxHintLen = 600 + for i := len(eventList.Items) - 1; i >= 0 && b.Len() < maxHintLen; i-- { + e := eventList.Items[i] + if b.Len() > 30 { + b.WriteString("; ") + } + b.WriteString("[") + b.WriteString(string(e.Type)) + b.WriteString("] ") + b.WriteString(e.Reason) + if e.Message != "" { + msg := e.Message + if len(msg) > 120 { + msg = msg[:117] + "..." + } + b.WriteString(": ") + b.WriteString(msg) + } + if !e.LastTimestamp.IsZero() { + b.WriteString(" (") + b.WriteString(e.LastTimestamp.Format("15:04:05")) + b.WriteString(")") + } + } + return b.String() +} + func WaitToBeDeleted(ctx context.Context, cs kubernetes.Interface, podNamespace, podName string) error { immediate := true err := wait.PollUntilContextTimeout(ctx, PollInterval, PollTimeout, immediate, func(ctx2 context.Context) (done bool, err error) { From 74daec71837028ec773860479ea116f72350c7c9 Mon Sep 17 00:00:00 2001 From: Marlow Warnicke Date: Wed, 18 Mar 2026 16:53:25 -0500 Subject: [PATCH 3/4] Fix dracputester CPU affinity to include all CPUs in mask --- pkg/cpuinfo/cpuinfo.go | 23 ++++++++++++++ pkg/cpuinfo/cpuinfo_test.go | 56 ++++++++++++++++++++++++++++++++++ test/image/dracputester/app.go | 11 ++----- 3 files changed, 81 insertions(+), 9 deletions(-) diff --git a/pkg/cpuinfo/cpuinfo.go b/pkg/cpuinfo/cpuinfo.go index 59bf767..bf33712 100644 --- a/pkg/cpuinfo/cpuinfo.go +++ b/pkg/cpuinfo/cpuinfo.go @@ -576,3 +576,26 @@ func (t *CPUTopology) CPUsPerUncore() int { // Note: this is an approximation that assumes all uncore caches have the same number of CPUs. return t.NumCPUs / t.NumUncoreCache } + +// AffinityMaxCPUID is the upper bound when scanning a scheduler affinity mask (e.g. from +// sched_getaffinity). The number of CPUs visible to the process (e.g. runtime.NumCPU() +// or cgroup size) may be less than the highest CPU ID in the mask, so the scan must +// use a fixed limit to avoid missing CPUs (e.g. 9-13 when cpuset is 2-5,9-13 and +// NumCPU() is 8). +const AffinityMaxCPUID = 2048 + +// AffinityMask is satisfied by kernel affinity masks (e.g. unix.CPUSet) and by test doubles. +type AffinityMask interface { + IsSet(i int) bool +} + +// FromAffinityMask scans the mask from 0 to AffinityMaxCPUID and returns the set of set CPU IDs. +func FromAffinityMask(mask AffinityMask) cpuset.CPUSet { + var allowedCPUs []int + for i := 0; i < AffinityMaxCPUID; i++ { + if mask.IsSet(i) { + allowedCPUs = append(allowedCPUs, i) + } + } + return cpuset.New(allowedCPUs...) +} diff --git a/pkg/cpuinfo/cpuinfo_test.go b/pkg/cpuinfo/cpuinfo_test.go index 9d1ea2a..48708e7 100644 --- a/pkg/cpuinfo/cpuinfo_test.go +++ b/pkg/cpuinfo/cpuinfo_test.go @@ -565,3 +565,59 @@ func TestSMTDetection(t *testing.T) { }) } } + +// affinityMaskFromSet implements AffinityMask for a cpuset (e.g. in tests). +type affinityMaskFromSet struct{ cpus cpuset.CPUSet } + +func (m affinityMaskFromSet) IsSet(i int) bool { return m.cpus.Contains(i) } + +func TestFromAffinityMask(t *testing.T) { + tests := []struct { + name string + mask AffinityMask + want cpuset.CPUSet + }{ + { + name: "empty", + mask: affinityMaskFromSet{cpuset.New()}, + want: cpuset.New(), + }, + { + name: "single CPU", + mask: affinityMaskFromSet{cpuset.New(3)}, + want: cpuset.New(3), + }, + { + name: "contiguous low CPUs", + mask: affinityMaskFromSet{cpuset.New(0, 1, 2, 3)}, + want: cpuset.New(0, 1, 2, 3), + }, + { + name: "split set 2-5 and 9-13 (bug case: high IDs beyond NumCPU())", + mask: affinityMaskFromSet{mustParseCPUSet(t, "2-5,9-13")}, + want: mustParseCPUSet(t, "2-5,9-13"), + }, + { + name: "high CPU IDs only", + mask: affinityMaskFromSet{cpuset.New(100, 101, 200)}, + want: cpuset.New(100, 101, 200), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := FromAffinityMask(tt.mask) + if !got.Equals(tt.want) { + t.Errorf("FromAffinityMask() = %s, want %s", got.String(), tt.want.String()) + } + }) + } +} + +func mustParseCPUSet(t *testing.T, s string) cpuset.CPUSet { + t.Helper() + cpus, err := cpuset.Parse(s) + if err != nil { + t.Fatalf("cpuset.Parse(%q): %v", s, err) + } + return cpus +} diff --git a/test/image/dracputester/app.go b/test/image/dracputester/app.go index aa0383b..004e887 100644 --- a/test/image/dracputester/app.go +++ b/test/image/dracputester/app.go @@ -23,10 +23,10 @@ import ( "os" "os/signal" "path/filepath" - "runtime" "strings" "time" + "github.com/kubernetes-sigs/dra-driver-cpu/pkg/cpuinfo" "github.com/kubernetes-sigs/dra-driver-cpu/test/pkg/discovery" "golang.org/x/sys/unix" "k8s.io/utils/cpuset" @@ -55,14 +55,7 @@ func getAffinity() (cpuset.CPUSet, error) { if err != nil { return cpuset.New(), err } - - var allowedCPUs []int - for i := 0; i < runtime.NumCPU(); i++ { - if unixCS.IsSet(i) { - allowedCPUs = append(allowedCPUs, i) - } - } - return cpuset.New(allowedCPUs...), nil + return cpuinfo.FromAffinityMask(&unixCS), nil } func main() { From a583a8d7343ed396607ffaf1b16ffcf198cfb944 Mon Sep 17 00:00:00 2001 From: Marlow Warnicke Date: Thu, 19 Mar 2026 10:32:49 -0500 Subject: [PATCH 4/4] chore: updates according to review - get rid of magic numbers for availableCPUsByNode (still a default) and discover allAllocatedCPUsByNode - Verify shared pool on every node that has exclusive pods. Use unique discovery pod names per node (discovery-pod-) to avoid name clashes on multi-node clusters. - move code to the dracputester app. Add in associated test file (with tests) --- pkg/cpuinfo/cpuinfo.go | 23 ------ pkg/cpuinfo/cpuinfo_test.go | 56 -------------- test/e2e/cpu_assignment_test.go | 113 +++++++++++++++++++--------- test/image/dracputester/app.go | 41 +++++++++- test/image/dracputester/app_test.go | 108 ++++++++++++++++++++++++++ 5 files changed, 225 insertions(+), 116 deletions(-) create mode 100644 test/image/dracputester/app_test.go diff --git a/pkg/cpuinfo/cpuinfo.go b/pkg/cpuinfo/cpuinfo.go index bf33712..59bf767 100644 --- a/pkg/cpuinfo/cpuinfo.go +++ b/pkg/cpuinfo/cpuinfo.go @@ -576,26 +576,3 @@ func (t *CPUTopology) CPUsPerUncore() int { // Note: this is an approximation that assumes all uncore caches have the same number of CPUs. return t.NumCPUs / t.NumUncoreCache } - -// AffinityMaxCPUID is the upper bound when scanning a scheduler affinity mask (e.g. from -// sched_getaffinity). The number of CPUs visible to the process (e.g. runtime.NumCPU() -// or cgroup size) may be less than the highest CPU ID in the mask, so the scan must -// use a fixed limit to avoid missing CPUs (e.g. 9-13 when cpuset is 2-5,9-13 and -// NumCPU() is 8). -const AffinityMaxCPUID = 2048 - -// AffinityMask is satisfied by kernel affinity masks (e.g. unix.CPUSet) and by test doubles. -type AffinityMask interface { - IsSet(i int) bool -} - -// FromAffinityMask scans the mask from 0 to AffinityMaxCPUID and returns the set of set CPU IDs. -func FromAffinityMask(mask AffinityMask) cpuset.CPUSet { - var allowedCPUs []int - for i := 0; i < AffinityMaxCPUID; i++ { - if mask.IsSet(i) { - allowedCPUs = append(allowedCPUs, i) - } - } - return cpuset.New(allowedCPUs...) -} diff --git a/pkg/cpuinfo/cpuinfo_test.go b/pkg/cpuinfo/cpuinfo_test.go index 48708e7..9d1ea2a 100644 --- a/pkg/cpuinfo/cpuinfo_test.go +++ b/pkg/cpuinfo/cpuinfo_test.go @@ -565,59 +565,3 @@ func TestSMTDetection(t *testing.T) { }) } } - -// affinityMaskFromSet implements AffinityMask for a cpuset (e.g. in tests). -type affinityMaskFromSet struct{ cpus cpuset.CPUSet } - -func (m affinityMaskFromSet) IsSet(i int) bool { return m.cpus.Contains(i) } - -func TestFromAffinityMask(t *testing.T) { - tests := []struct { - name string - mask AffinityMask - want cpuset.CPUSet - }{ - { - name: "empty", - mask: affinityMaskFromSet{cpuset.New()}, - want: cpuset.New(), - }, - { - name: "single CPU", - mask: affinityMaskFromSet{cpuset.New(3)}, - want: cpuset.New(3), - }, - { - name: "contiguous low CPUs", - mask: affinityMaskFromSet{cpuset.New(0, 1, 2, 3)}, - want: cpuset.New(0, 1, 2, 3), - }, - { - name: "split set 2-5 and 9-13 (bug case: high IDs beyond NumCPU())", - mask: affinityMaskFromSet{mustParseCPUSet(t, "2-5,9-13")}, - want: mustParseCPUSet(t, "2-5,9-13"), - }, - { - name: "high CPU IDs only", - mask: affinityMaskFromSet{cpuset.New(100, 101, 200)}, - want: cpuset.New(100, 101, 200), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := FromAffinityMask(tt.mask) - if !got.Equals(tt.want) { - t.Errorf("FromAffinityMask() = %s, want %s", got.String(), tt.want.String()) - } - }) - } -} - -func mustParseCPUSet(t *testing.T, s string) cpuset.CPUSet { - t.Helper() - cpus, err := cpuset.Parse(s) - if err != nil { - t.Fatalf("cpuset.Parse(%q): %v", s, err) - } - return cpus -} diff --git a/test/e2e/cpu_assignment_test.go b/test/e2e/cpu_assignment_test.go index bb5c37d..8cfb27d 100644 --- a/test/e2e/cpu_assignment_test.go +++ b/test/e2e/cpu_assignment_test.go @@ -60,13 +60,14 @@ Please note "Serial" is however unavoidable because we manage the shared node st */ var _ = ginkgo.Describe("CPU Allocation", ginkgo.Serial, ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { var ( - rootFxt *fixture.Fixture - targetNode *v1.Node - targetNodeCPUInfo discovery.DRACPUInfo - availableCPUs cpuset.CPUSet - dracpuTesterImage string - reservedCPUs cpuset.CPUSet - cpuDeviceMode string + rootFxt *fixture.Fixture + targetNode *v1.Node + targetNodeCPUInfo discovery.DRACPUInfo + availableCPUs cpuset.CPUSet + availableCPUsByNode map[string]cpuset.CPUSet + dracpuTesterImage string + reservedCPUs cpuset.CPUSet + cpuDeviceMode string ) ginkgo.BeforeAll(func(ctx context.Context) { @@ -106,38 +107,50 @@ var _ = ginkgo.Describe("CPU Allocation", ginkgo.Serial, ginkgo.Ordered, ginkgo. gomega.Expect(dsReservedCPUs.Equals(reservedCPUs)).To(gomega.BeTrue(), "daemonset reserved cpus %v do not match test reserved cpus %v", dsReservedCPUs.String(), reservedCPUs.String()) rootFxt.Log.Info("daemonset --cpu-device-mode configuration", "mode", cpuDeviceMode) + var workerNodes []*v1.Node if targetNodeName := os.Getenv("DRACPU_E2E_TARGET_NODE"); len(targetNodeName) > 0 { targetNode, err = rootFxt.K8SClientset.CoreV1().Nodes().Get(ctx, targetNodeName, metav1.GetOptions{}) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "cannot get worker node %q: %v", targetNodeName, err) + workerNodes = []*v1.Node{targetNode} } else { gomega.Eventually(func() error { - workerNodes, err := node.FindWorkers(ctx, infraFxt.K8SClientset) - if err != nil { - return err + var findErr error + workerNodes, findErr = node.FindWorkers(ctx, infraFxt.K8SClientset) + if findErr != nil { + return findErr } if len(workerNodes) == 0 { return fmt.Errorf("no worker nodes detected") } - targetNode = workerNodes[0] // pick random one, this is the simplest random pick + targetNode = workerNodes[0] return nil }).WithTimeout(1*time.Minute).WithPolling(5*time.Second).Should(gomega.Succeed(), "failed to find any worker node") } rootFxt.Log.Info("using worker node", "nodeName", targetNode.Name) - infoPod := discovery.MakePod(infraFxt.Namespace.Name, dracpuTesterImage) - infoPod = e2epod.PinToNode(infoPod, targetNode.Name) - infoPod, err = e2epod.RunToCompletion(ctx, infraFxt.K8SClientset, infoPod) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), "cannot create discovery pod: %v", err) - data, err := e2epod.GetLogs(infraFxt.K8SClientset, ctx, infoPod.Namespace, infoPod.Name, infoPod.Spec.Containers[0].Name) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), "cannot get logs from discovery pod: %v", err) - gomega.Expect(json.Unmarshal([]byte(data), &targetNodeCPUInfo)).To(gomega.Succeed()) - - allocatableCPUs := makeCPUSetFromDiscoveredCPUInfo(targetNodeCPUInfo) - availableCPUs = allocatableCPUs.Difference(reservedCPUs) - if reservedCPUs.Size() > 0 { - gomega.Expect(availableCPUs.Intersection(reservedCPUs).Size()).To(gomega.BeZero(), "available cpus %v overlap with reserved cpus %v", availableCPUs.String(), reservedCPUs.String()) + availableCPUsByNode = make(map[string]cpuset.CPUSet) + for _, n := range workerNodes { + infoPod := discovery.MakePod(infraFxt.Namespace.Name, dracpuTesterImage) + infoPod.Name = "discovery-pod-" + n.Name + infoPod = e2epod.PinToNode(infoPod, n.Name) + infoPod, err = e2epod.RunToCompletion(ctx, infraFxt.K8SClientset, infoPod) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "cannot create discovery pod on node %q: %v", n.Name, err) + data, err := e2epod.GetLogs(infraFxt.K8SClientset, ctx, infoPod.Namespace, infoPod.Name, infoPod.Spec.Containers[0].Name) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "cannot get logs from discovery pod: %v", err) + var nodeCPUInfo discovery.DRACPUInfo + gomega.Expect(json.Unmarshal([]byte(data), &nodeCPUInfo)).To(gomega.Succeed()) + if n.Name == targetNode.Name { + targetNodeCPUInfo = nodeCPUInfo + } + allocatable := makeCPUSetFromDiscoveredCPUInfo(nodeCPUInfo) + available := allocatable.Difference(reservedCPUs) + if reservedCPUs.Size() > 0 { + gomega.Expect(available.Intersection(reservedCPUs).Size()).To(gomega.BeZero(), "node %q: available cpus %v overlap reserved %v", n.Name, available.String(), reservedCPUs.String()) + } + availableCPUsByNode[n.Name] = available + rootFxt.Log.Info("checking worker node", "nodeName", n.Name, "coreCount", len(nodeCPUInfo.CPUs), "allocatableCPUs", allocatable.String(), "availableCPUs", available.String()) } - rootFxt.Log.Info("checking worker node", "nodeName", infoPod.Spec.NodeName, "coreCount", len(targetNodeCPUInfo.CPUs), "allocatableCPUs", allocatableCPUs.String(), "reservedCPUs", reservedCPUs.String(), "availableCPUs", availableCPUs.String()) + availableCPUs = availableCPUsByNode[targetNode.Name] }) ginkgo.When("setting resource claims", func() { @@ -213,8 +226,12 @@ var _ = ginkgo.Describe("CPU Allocation", ginkgo.Serial, ginkgo.Ordered, ginkgo. if alloc.CPUAssigned.Size() != cpusPerClaim { return fmt.Errorf("pod %d: got %d CPUs, want %d", i, alloc.CPUAssigned.Size(), cpusPerClaim) } - if !alloc.CPUAssigned.IsSubsetOf(availableCPUs) { - return fmt.Errorf("pod %d: CPUs %s outside available set %s", i, alloc.CPUAssigned.String(), availableCPUs.String()) + availableOnNode := availableCPUsByNode[nodeName] + if availableOnNode.Size() == 0 { + availableOnNode = availableCPUs + } + if !alloc.CPUAssigned.IsSubsetOf(availableOnNode) { + return fmt.Errorf("pod %d on node %s: CPUs %s outside node available set %s", i, nodeName, alloc.CPUAssigned.String(), availableOnNode.String()) } allocatedOnNode := allAllocatedCPUsByNode[nodeName] if allocatedOnNode.Intersection(alloc.CPUAssigned).Size() != 0 { @@ -232,15 +249,28 @@ var _ = ginkgo.Describe("CPU Allocation", ginkgo.Serial, ginkgo.Ordered, ginkgo. gomega.Expect(totalAllocated).To(gomega.Equal(numPods * cpusPerClaim)) rootFxt.Log.Info("All exclusive allocation", "byNode", allAllocatedCPUsByNode, "expected Shared CPUs on target", availableCPUs.Difference(allAllocatedCPUsByNode[targetNode.Name]).String()) - fixture.By("checking the shared pool does not include anymore the exclusively allocated CPUs") - expectedSharedCPUs := availableCPUs.Difference(allAllocatedCPUsByNode[targetNode.Name]) - - fixture.By("creating a second best-effort reference pod") - shrPod2 := mustCreateBestEffortPod(ctx, fxt, targetNode.Name, dracpuTesterImage) - verifySharedPoolMatches(ctx, fxt, shrPod2, expectedSharedCPUs) - - ginkgo.By("checking the CPU pool of the best-effort pod created before the pods with CPU resource claims") - verifySharedPoolMatches(ctx, fxt, shrPod1, expectedSharedCPUs) + fixture.By("checking the shared pool does not include anymore the exclusively allocated CPUs on each node with exclusive pods") + var shrPod2 *v1.Pod + var otherNodeSharedPods []*v1.Pod + for nodeName := range allAllocatedCPUsByNode { + availableOnNode := availableCPUsByNode[nodeName] + if availableOnNode.Size() == 0 { + availableOnNode = availableCPUs + } + expectedSharedOnNode := availableOnNode.Difference(allAllocatedCPUsByNode[nodeName]) + if nodeName == targetNode.Name { + fixture.By("creating a second best-effort reference pod on target node") + shrPod2 = mustCreateBestEffortPod(ctx, fxt, targetNode.Name, dracpuTesterImage) + verifySharedPoolMatches(ctx, fxt, shrPod2, expectedSharedOnNode) + ginkgo.By("checking the CPU pool of the best-effort pod created before the pods with CPU resource claims") + verifySharedPoolMatches(ctx, fxt, shrPod1, expectedSharedOnNode) + continue + } + fixture.By("creating best-effort pod on node %s to verify shared pool", nodeName) + shrPodOther := mustCreateBestEffortPod(ctx, fxt, nodeName, dracpuTesterImage) + verifySharedPoolMatches(ctx, fxt, shrPodOther, expectedSharedOnNode) + otherNodeSharedPods = append(otherNodeSharedPods, shrPodOther) + } fixture.By("deleting the pods with exclusive CPUs") for _, pod := range exclPods { @@ -248,7 +278,18 @@ var _ = ginkgo.Describe("CPU Allocation", ginkgo.Serial, ginkgo.Ordered, ginkgo. } verifySharedPoolMatches(ctx, fxt, shrPod1, availableCPUs) - verifySharedPoolMatches(ctx, fxt, shrPod2, availableCPUs) + if shrPod2 != nil { + verifySharedPoolMatches(ctx, fxt, shrPod2, availableCPUs) + } + for _, shrPodOther := range otherNodeSharedPods { + nodeName := shrPodOther.Spec.NodeName + availableOnNode := availableCPUsByNode[nodeName] + if availableOnNode.Size() == 0 { + availableOnNode = availableCPUs + } + verifySharedPoolMatches(ctx, fxt, shrPodOther, availableOnNode) + gomega.Expect(e2epod.DeleteSync(ctx, fxt.K8SClientset, shrPodOther)).To(gomega.Succeed(), "cannot delete helper pod %s", e2epod.Identify(shrPodOther)) + } }) }) }) diff --git a/test/image/dracputester/app.go b/test/image/dracputester/app.go index 004e887..a42126e 100644 --- a/test/image/dracputester/app.go +++ b/test/image/dracputester/app.go @@ -35,6 +35,9 @@ import ( const ( cgroupPath = "fs/cgroup" cpusetFile = "cpuset.cpus.effective" + // affinityScanMax is the upper bound when scanning sched_getaffinity if topology is unavailable. + // Using runtime.NumCPU() would miss CPUs when the cgroup cpuset is non-contiguous (e.g. 2-5,9-13). + affinityScanMax = 2048 ) func cpuSetPath(sysRoot string) string { @@ -49,13 +52,49 @@ func cpuSet(sysRoot string) (cpuset.CPUSet, error) { return cpuset.Parse(strings.TrimSpace(string(data))) } +// affinityMask is satisfied by unix.CPUSet and test doubles. +type affinityMask interface { + IsSet(i int) bool +} + +// affinityFromMask scans the mask from 0 to maxCPUID (exclusive) and returns the set of set CPU IDs. +// If maxCPUID <= 0, affinityScanMax is used. +func affinityFromMask(mask affinityMask, maxCPUID int) cpuset.CPUSet { + if maxCPUID <= 0 { + maxCPUID = affinityScanMax + } + var allowedCPUs []int + for i := 0; i < maxCPUID; i++ { + if mask.IsSet(i) { + allowedCPUs = append(allowedCPUs, i) + } + } + return cpuset.New(allowedCPUs...) +} + +func affinityScanBoundFromTopology(topo *cpuinfo.CPUTopology) int { + if topo == nil || topo.NumCPUs == 0 { + return affinityScanMax + } + cpus := topo.CPUDetails.CPUs() + if cpus.Size() == 0 { + return topo.NumCPUs + } + list := cpus.List() + return list[len(list)-1] + 1 +} + func getAffinity() (cpuset.CPUSet, error) { var unixCS unix.CPUSet err := unix.SchedGetaffinity(os.Getpid(), &unixCS) if err != nil { return cpuset.New(), err } - return cpuinfo.FromAffinityMask(&unixCS), nil + maxCPUID := 0 + if topo, err := cpuinfo.NewSystemCPUInfo().GetCPUTopology(); err == nil { + maxCPUID = affinityScanBoundFromTopology(topo) + } + return affinityFromMask(&unixCS, maxCPUID), nil } func main() { diff --git a/test/image/dracputester/app_test.go b/test/image/dracputester/app_test.go new file mode 100644 index 0000000..106b78a --- /dev/null +++ b/test/image/dracputester/app_test.go @@ -0,0 +1,108 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "testing" + + "github.com/kubernetes-sigs/dra-driver-cpu/pkg/cpuinfo" + "k8s.io/utils/cpuset" +) + +type maskFromSet struct{ cpus cpuset.CPUSet } + +func (m maskFromSet) IsSet(i int) bool { return m.cpus.Contains(i) } + +func TestAffinityFromMask(t *testing.T) { + tests := []struct { + name string + mask affinityMask + maxCPUID int + want cpuset.CPUSet + }{ + { + name: "empty", + mask: maskFromSet{cpuset.New()}, + maxCPUID: 0, + want: cpuset.New(), + }, + { + name: "single CPU", + mask: maskFromSet{cpuset.New(3)}, + maxCPUID: 0, + want: cpuset.New(3), + }, + { + name: "split set 2-5 and 9-13 (bug case: high IDs beyond NumCPU())", + mask: maskFromSet{mustParse(t, "2-5,9-13")}, + maxCPUID: 0, + want: mustParse(t, "2-5,9-13"), + }, + { + name: "high CPU IDs only", + mask: maskFromSet{cpuset.New(100, 101, 200)}, + maxCPUID: 0, + want: cpuset.New(100, 101, 200), + }, + { + name: "explicit bound 20", + mask: maskFromSet{cpuset.New(2, 5, 9)}, + maxCPUID: 20, + want: cpuset.New(2, 5, 9), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := affinityFromMask(tt.mask, tt.maxCPUID) + if !got.Equals(tt.want) { + t.Errorf("affinityFromMask() = %s, want %s", got.String(), tt.want.String()) + } + }) + } +} + +func TestAffinityScanBoundFromTopology(t *testing.T) { + t.Run("nil returns default", func(t *testing.T) { + if got := affinityScanBoundFromTopology(nil); got != affinityScanMax { + t.Errorf("affinityScanBoundFromTopology(nil) = %d, want %d", got, affinityScanMax) + } + }) + t.Run("empty CPUDetails uses NumCPUs", func(t *testing.T) { + topo := &cpuinfo.CPUTopology{NumCPUs: 16, CPUDetails: cpuinfo.CPUDetails{}} + if got := affinityScanBoundFromTopology(topo); got != 16 { + t.Errorf("affinityScanBoundFromTopology() = %d, want 16", got) + } + }) + t.Run("max CPU ID plus one", func(t *testing.T) { + topo := &cpuinfo.CPUTopology{ + NumCPUs: 4, + CPUDetails: cpuinfo.CPUDetails{2: cpuinfo.CPUInfo{}, 3: cpuinfo.CPUInfo{}, 5: cpuinfo.CPUInfo{}, 9: cpuinfo.CPUInfo{}}, + } + if got := affinityScanBoundFromTopology(topo); got != 10 { + t.Errorf("affinityScanBoundFromTopology() = %d, want 10", got) + } + }) +} + +func mustParse(t *testing.T, s string) cpuset.CPUSet { + t.Helper() + cpus, err := cpuset.Parse(s) + if err != nil { + t.Fatalf("cpuset.Parse(%q): %v", s, err) + } + return cpus +}