From 156529d6d1e9892de688a1e6b6a88cc4fc59d50d Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Thu, 9 May 2024 02:36:00 +0900 Subject: [PATCH] Reorganize validations for the frameworks and the externalFrameworks (#2130) Signed-off-by: Yuki Iwai --- cmd/kueue/main.go | 39 +---- cmd/kueue/main_test.go | 169 ---------------------- pkg/config/config.go | 2 +- pkg/config/config_test.go | 26 ++-- pkg/config/validation.go | 60 ++++++-- pkg/config/validation_test.go | 99 ++++++++++++- pkg/controller/jobframework/reconciler.go | 19 ++- pkg/controller/jobframework/setup.go | 5 + pkg/controller/jobframework/setup_test.go | 4 + 9 files changed, 183 insertions(+), 240 deletions(-) delete mode 100644 cmd/kueue/main_test.go diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index b19c7b75ae..781ae9cd48 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -32,10 +32,7 @@ import ( corev1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/validation/field" utilfeature "k8s.io/apiserver/pkg/util/feature" autoscaling "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" "k8s.io/client-go/discovery" @@ -43,7 +40,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -271,6 +267,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag jobframework.WithKubeServerVersion(serverVersionFetcher), jobframework.WithIntegrationOptions(corev1.SchemeGroupVersion.WithKind("Pod").String(), cfg.Integrations.PodOptions), jobframework.WithEnabledFrameworks(cfg.Integrations), + jobframework.WithEnabledExternalFrameworks(cfg.Integrations.ExternalFrameworks), jobframework.WithManagerName(constants.KueueName), jobframework.WithLabelKeysToCopy(cfg.Integrations.LabelKeysToCopy), } @@ -364,44 +361,10 @@ func apply(configFile string) (ctrl.Options, configapi.Configuration, error) { if err != nil { return options, cfg, err } - - if cfg.Integrations != nil { - var errorlist field.ErrorList - managedKinds := make(sets.Set[string]) - availableFrameworks := jobframework.GetIntegrationsList() - path := field.NewPath("integrations", "frameworks") - for _, framework := range cfg.Integrations.Frameworks { - if cb, found := jobframework.GetIntegration(framework); !found { - errorlist = append(errorlist, field.NotSupported(path, framework, availableFrameworks)) - } else { - if gvk, err := apiutil.GVKForObject(cb.JobType, scheme); err == nil { - managedKinds = managedKinds.Insert(gvk.String()) - } - } - } - - path = field.NewPath("integrations", "externalFrameworks") - for idx, name := range cfg.Integrations.ExternalFrameworks { - if err := jobframework.RegisterExternalJobType(name); err == nil { - gvk, _ := schema.ParseKindArg(name) - if managedKinds.Has(gvk.String()) { - errorlist = append(errorlist, field.Duplicate(path.Index(idx), name)) - } - managedKinds = managedKinds.Insert(gvk.String()) - } - } - - if len(errorlist) > 0 { - err := errorlist.ToAggregate() - return options, cfg, err - } - } - cfgStr, err := config.Encode(scheme, &cfg) if err != nil { return options, cfg, err } setupLog.Info("Successfully loaded configuration", "config", cfgStr) - return options, cfg, nil } diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go deleted file mode 100644 index 882047e4be..0000000000 --- a/cmd/kueue/main_test.go +++ /dev/null @@ -1,169 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "fmt" - "os" - "path/filepath" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/utils/ptr" - - config "sigs.k8s.io/kueue/apis/config/v1beta1" - "sigs.k8s.io/kueue/pkg/controller/jobs/job" -) - -func TestValidateIntegrationsName(t *testing.T) { - tmpDir := t.TempDir() - - integrationsConfig := filepath.Join(tmpDir, "integrations.yaml") - if err := os.WriteFile(integrationsConfig, []byte(` -apiVersion: config.kueue.x-k8s.io/v1beta1 -kind: Configuration -integrations: - frameworks: - - batch/job - externalFrameworks: - - "Foo.v1.example.com" -`), os.FileMode(0600)); err != nil { - t.Fatal(err) - } - - badIntegrationsConfig := filepath.Join(tmpDir, "badIntegrations.yaml") - if err := os.WriteFile(badIntegrationsConfig, []byte(` -apiVersion: config.kueue.x-k8s.io/v1beta1 -kind: Configuration -integrations: - frameworks: - - unregistered/jobframework -`), os.FileMode(0600)); err != nil { - t.Fatal(err) - } - - badIntegrationsConfig2 := filepath.Join(tmpDir, "badIntegrations2.yaml") - if err := os.WriteFile(badIntegrationsConfig2, []byte(` -apiVersion: config.kueue.x-k8s.io/v1beta1 -kind: Configuration -integrations: - frameworks: - - batch/job - externalFrameworks: - - Job.v1.batch -`), os.FileMode(0600)); err != nil { - t.Fatal(err) - } - - enableDefaultInternalCertManagement := &config.InternalCertManagement{ - Enable: ptr.To(true), - WebhookServiceName: ptr.To(config.DefaultWebhookServiceName), - WebhookSecretName: ptr.To(config.DefaultWebhookSecretName), - } - - configCmpOpts := []cmp.Option{ - cmpopts.IgnoreFields(config.Configuration{}, "ControllerManager"), - } - - defaultClientConnection := &config.ClientConnection{ - QPS: ptr.To(config.DefaultClientConnectionQPS), - Burst: ptr.To(config.DefaultClientConnectionBurst), - } - - testcases := []struct { - name string - configFile string - wantConfiguration config.Configuration - wantError error - }{ - { - name: "integrations config", - configFile: integrationsConfig, - wantConfiguration: config.Configuration{ - TypeMeta: metav1.TypeMeta{ - APIVersion: config.GroupVersion.String(), - Kind: "Configuration", - }, - Namespace: ptr.To(config.DefaultNamespace), - ManageJobsWithoutQueueName: false, - InternalCertManagement: enableDefaultInternalCertManagement, - ClientConnection: defaultClientConnection, - Integrations: &config.Integrations{ - // referencing job.FrameworkName ensures the link of job package - // therefore the batch/framework should be registered - Frameworks: []string{job.FrameworkName}, - ExternalFrameworks: []string{"Foo.v1.example.com"}, - PodOptions: &config.PodIntegrationOptions{ - NamespaceSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "kubernetes.io/metadata.name", - Operator: metav1.LabelSelectorOpNotIn, - Values: []string{"kube-system", "kueue-system"}, - }, - }, - }, - PodSelector: &metav1.LabelSelector{}, - }, - }, - QueueVisibility: &config.QueueVisibility{ - UpdateIntervalSeconds: config.DefaultQueueVisibilityUpdateIntervalSeconds, - ClusterQueues: &config.ClusterQueueVisibility{ - MaxCount: config.DefaultClusterQueuesMaxCount, - }, - }, - MultiKueue: &config.MultiKueue{ - GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval}, - Origin: ptr.To(config.DefaultMultiKueueOrigin), - WorkerLostTimeout: &metav1.Duration{Duration: config.DefaultMultiKueueWorkerLostTimeout}, - }, - }, - }, - { - name: "bad integrations config", - configFile: badIntegrationsConfig, - wantError: fmt.Errorf("integrations.frameworks: Unsupported value: \"unregistered/jobframework\": supported values: \"batch/job\", \"jobset.x-k8s.io/jobset\", \"kubeflow.org/mpijob\", \"kubeflow.org/mxjob\", \"kubeflow.org/paddlejob\", \"kubeflow.org/pytorchjob\", \"kubeflow.org/tfjob\", \"kubeflow.org/xgboostjob\", \"pod\", \"ray.io/raycluster\", \"ray.io/rayjob\""), - }, - { - name: "bad integrations config 2", - configFile: badIntegrationsConfig2, - wantError: fmt.Errorf("integrations.externalFrameworks[0]: Duplicate value: \"Job.v1.batch\""), - }, - } - - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - _, cfg, err := apply(tc.configFile) - if tc.wantError == nil { - if err != nil { - t.Errorf("Unexpected error:%s", err) - } - if diff := cmp.Diff(tc.wantConfiguration, cfg, configCmpOpts...); diff != "" { - t.Errorf("Unexpected config (-want +got):\n%s", diff) - } - } else { - if err == nil { - t.Errorf("Failed to get expected error") - } else if diff := cmp.Diff(tc.wantError.Error(), err.Error()); diff != "" { - t.Errorf("Unexpected error (-want +got):\n%s", diff) - } - } - }) - } -} diff --git a/pkg/config/config.go b/pkg/config/config.go index c618ae5e64..6ac0603697 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -162,7 +162,7 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co return options, cfg, err } } - if err := validate(&cfg).ToAggregate(); err != nil { + if err := validate(&cfg, scheme).ToAggregate(); err != nil { return options, cfg, err } addTo(&options, &cfg) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 7a12c73d14..1c9e78a565 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -38,12 +38,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" configapi "sigs.k8s.io/kueue/apis/config/v1beta1" + _ "sigs.k8s.io/kueue/pkg/controller/jobs" "sigs.k8s.io/kueue/pkg/controller/jobs/job" ) func TestLoad(t *testing.T) { - test_scheme := runtime.NewScheme() - err := configapi.AddToScheme(test_scheme) + testScheme := runtime.NewScheme() + err := configapi.AddToScheme(testScheme) if err != nil { t.Fatal(err) } @@ -213,6 +214,7 @@ clientConnection: `), os.FileMode(0600)); err != nil { t.Fatal(err) } + integrationsConfig := filepath.Join(tmpDir, "integrations.yaml") if err := os.WriteFile(integrationsConfig, []byte(` apiVersion: config.kueue.x-k8s.io/v1beta1 @@ -220,9 +222,12 @@ kind: Configuration integrations: frameworks: - batch/job + externalFrameworks: + - Foo.v1.example.com `), os.FileMode(0600)); err != nil { t.Fatal(err) } + queueVisibilityConfig := filepath.Join(tmpDir, "queueVisibility.yaml") if err := os.WriteFile(queueVisibilityConfig, []byte(` apiVersion: config.kueue.x-k8s.io/v1beta1 @@ -234,6 +239,7 @@ queueVisibility: `), os.FileMode(0600)); err != nil { t.Fatal(err) } + podIntegrationOptionsConfig := filepath.Join(tmpDir, "podIntegrationOptions.yaml") if err := os.WriteFile(podIntegrationOptionsConfig, []byte(` apiVersion: config.kueue.x-k8s.io/v1beta1 @@ -268,6 +274,7 @@ multiKueue: `), os.FileMode(0600)); err != nil { t.Fatal(err) } + defaultControlOptions := ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, Metrics: metricsserver.Options{ @@ -656,7 +663,8 @@ multiKueue: Integrations: &configapi.Integrations{ // referencing job.FrameworkName ensures the link of job package // therefore the batch/framework should be registered - Frameworks: []string{job.FrameworkName}, + Frameworks: []string{job.FrameworkName}, + ExternalFrameworks: []string{"Foo.v1.example.com"}, PodOptions: &configapi.PodIntegrationOptions{ NamespaceSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ @@ -814,7 +822,7 @@ multiKueue: for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - options, cfg, err := Load(test_scheme, tc.configFile) + options, cfg, err := Load(testScheme, tc.configFile) if tc.wantError == nil { if err != nil { t.Errorf("Unexpected error:%s", err) @@ -835,14 +843,14 @@ multiKueue: } func TestEncode(t *testing.T) { - test_scheme := runtime.NewScheme() - err := configapi.AddToScheme(test_scheme) + testScheme := runtime.NewScheme() + err := configapi.AddToScheme(testScheme) if err != nil { t.Fatal(err) } defaultConfig := &configapi.Configuration{} - test_scheme.Default(defaultConfig) + testScheme.Default(defaultConfig) testcases := []struct { name string @@ -853,7 +861,7 @@ func TestEncode(t *testing.T) { { name: "empty", - scheme: test_scheme, + scheme: testScheme, cfg: &configapi.Configuration{}, wantResult: map[string]any{ "apiVersion": "config.kueue.x-k8s.io/v1beta1", @@ -866,7 +874,7 @@ func TestEncode(t *testing.T) { }, { name: "default", - scheme: test_scheme, + scheme: testScheme, cfg: defaultConfig, wantResult: map[string]any{ "apiVersion": "config.kueue.x-k8s.io/v1beta1", diff --git a/pkg/config/validation.go b/pkg/config/validation.go index d88b9e83f9..7638da4fb3 100644 --- a/pkg/config/validation.go +++ b/pkg/config/validation.go @@ -26,11 +26,17 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/validation" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" apimachineryvalidation "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation/field" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" configapi "sigs.k8s.io/kueue/apis/config/v1beta1" "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + podworkload "sigs.k8s.io/kueue/pkg/controller/jobs/pod" ) const ( @@ -39,17 +45,18 @@ const ( ) var ( - integrationsPath = field.NewPath("integrations") - integrationsFrameworksPath = integrationsPath.Child("frameworks") - podOptionsPath = integrationsPath.Child("podOptions") - namespaceSelectorPath = podOptionsPath.Child("namespaceSelector") - waitForPodsReadyPath = field.NewPath("waitForPodsReady") - requeuingStrategyPath = waitForPodsReadyPath.Child("requeuingStrategy") - multiKueuePath = field.NewPath("multiKueue") - fsPreemptionStrategiesPath = field.NewPath("fairSharing", "preemptionStrategies") + integrationsPath = field.NewPath("integrations") + integrationsFrameworksPath = integrationsPath.Child("frameworks") + integrationsExternalFrameworkPath = integrationsPath.Child("externalFrameworks") + podOptionsPath = integrationsPath.Child("podOptions") + namespaceSelectorPath = podOptionsPath.Child("namespaceSelector") + waitForPodsReadyPath = field.NewPath("waitForPodsReady") + requeuingStrategyPath = waitForPodsReadyPath.Child("requeuingStrategy") + multiKueuePath = field.NewPath("multiKueue") + fsPreemptionStrategiesPath = field.NewPath("fairSharing", "preemptionStrategies") ) -func validate(c *configapi.Configuration) field.ErrorList { +func validate(c *configapi.Configuration, scheme *runtime.Scheme) field.ErrorList { var allErrs field.ErrorList allErrs = append(allErrs, validateWaitForPodsReady(c)...) @@ -57,7 +64,7 @@ func validate(c *configapi.Configuration) field.ErrorList { allErrs = append(allErrs, validateQueueVisibility(c)...) // Validate PodNamespaceSelector for the pod framework - allErrs = append(allErrs, validateIntegrations(c)...) + allErrs = append(allErrs, validateIntegrations(c, scheme)...) allErrs = append(allErrs, validateMultiKueue(c)...) allErrs = append(allErrs, validateFairSharing(c)...) @@ -124,26 +131,47 @@ func validateQueueVisibility(cfg *configapi.Configuration) field.ErrorList { return allErrs } -func validateIntegrations(c *configapi.Configuration) field.ErrorList { +func validateIntegrations(c *configapi.Configuration, scheme *runtime.Scheme) field.ErrorList { var allErrs field.ErrorList - if c.Integrations == nil { return field.ErrorList{field.Required(integrationsPath, "cannot be empty")} } - if c.Integrations.Frameworks == nil { return field.ErrorList{field.Required(integrationsFrameworksPath, "cannot be empty")} } - allErrs = append(allErrs, validatePodIntegrationOptions(c)...) + managedFrameworks := sets.New[string]() + availableBuiltInFrameworks := jobframework.GetIntegrationsList() + for idx, framework := range c.Integrations.Frameworks { + if cb, found := jobframework.GetIntegration(framework); !found { + allErrs = append(allErrs, field.NotSupported(integrationsFrameworksPath.Index(idx), framework, availableBuiltInFrameworks)) + } else if gvk, err := apiutil.GVKForObject(cb.JobType, scheme); err == nil { + if managedFrameworks.Has(gvk.String()) { + allErrs = append(allErrs, field.Duplicate(integrationsFrameworksPath.Index(idx), framework)) + } else { + managedFrameworks = managedFrameworks.Insert(gvk.String()) + } + } + } + for idx, framework := range c.Integrations.ExternalFrameworks { + gvk, _ := schema.ParseKindArg(framework) + if gvk == nil { + allErrs = append(allErrs, field.Invalid(integrationsExternalFrameworkPath.Index(idx), framework, "must be format, 'Kind.version.group.com'")) + } else if managedFrameworks.Has(gvk.String()) { + allErrs = append(allErrs, field.Duplicate(integrationsExternalFrameworkPath.Index(idx), framework)) + } else { + managedFrameworks = managedFrameworks.Insert(gvk.String()) + } + } + allErrs = append(allErrs, validatePodIntegrationOptions(c)...) return allErrs } func validatePodIntegrationOptions(c *configapi.Configuration) field.ErrorList { var allErrs field.ErrorList - if !slices.Contains(c.Integrations.Frameworks, "pod") { + if !slices.Contains(c.Integrations.Frameworks, podworkload.FrameworkName) { return allErrs } @@ -154,7 +182,7 @@ func validatePodIntegrationOptions(c *configapi.Configuration) field.ErrorList { return field.ErrorList{field.Required(namespaceSelectorPath, "a namespace selector is required")} } - prohibitedNamespaces := []labels.Set{{corev1.LabelMetadataName: "kube-system"}} + prohibitedNamespaces := []labels.Set{{corev1.LabelMetadataName: metav1.NamespaceSystem}} if c.Namespace != nil && *c.Namespace != "" { prohibitedNamespaces = append(prohibitedNamespaces, labels.Set{corev1.LabelMetadataName: *c.Namespace}) diff --git a/pkg/config/validation_test.go b/pkg/config/validation_test.go index 3a5e21bd96..4e002ae8ab 100644 --- a/pkg/config/validation_test.go +++ b/pkg/config/validation_test.go @@ -24,20 +24,29 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/ptr" configapi "sigs.k8s.io/kueue/apis/config/v1beta1" ) func TestValidate(t *testing.T) { + testScheme := runtime.NewScheme() + if err := configapi.AddToScheme(testScheme); err != nil { + t.Fatal(err) + } + if err := clientgoscheme.AddToScheme(testScheme); err != nil { + t.Fatal(err) + } + defaultQueueVisibility := &configapi.QueueVisibility{ UpdateIntervalSeconds: configapi.DefaultQueueVisibilityUpdateIntervalSeconds, ClusterQueues: &configapi.ClusterQueueVisibility{ MaxCount: configapi.DefaultClusterQueuesMaxCount, }, } - defaultPodIntegrationOptions := &configapi.PodIntegrationOptions{ NamespaceSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ @@ -50,7 +59,6 @@ func TestValidate(t *testing.T) { }, PodSelector: &metav1.LabelSelector{}, } - defaultIntegrations := &configapi.Integrations{ Frameworks: []string{"batch/job"}, PodOptions: defaultPodIntegrationOptions, @@ -94,6 +102,91 @@ func TestValidate(t *testing.T) { field.Invalid(field.NewPath("queueVisibility").Child("clusterQueues").Child("maxCount"), 4001, fmt.Sprintf("must be less than %d", queueVisibilityClusterQueuesMaxValue)), }, }, + "empty integrations.frameworks": { + cfg: &configapi.Configuration{ + Integrations: &configapi.Integrations{}, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeRequired, + Field: "integrations.frameworks", + }, + }, + }, + "unregistered integrations.frameworks": { + cfg: &configapi.Configuration{ + Integrations: &configapi.Integrations{ + Frameworks: []string{"unregistered/jobframework"}, + }, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeNotSupported, + Field: "integrations.frameworks[0]", + }, + }, + }, + "duplicate integrations.frameworks": { + cfg: &configapi.Configuration{ + Integrations: &configapi.Integrations{ + Frameworks: []string{ + "batch/job", + "batch/job", + }, + }, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeDuplicate, + Field: "integrations.frameworks[1]", + }, + }, + }, + "duplicate frameworks between integrations.frameworks and integrations.externalFrameworks": { + cfg: &configapi.Configuration{ + Integrations: &configapi.Integrations{ + Frameworks: []string{"batch/job"}, + ExternalFrameworks: []string{"Job.v1.batch"}, + }, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeDuplicate, + Field: "integrations.externalFrameworks[0]", + }, + }, + }, + "invalid format integrations.externalFrameworks": { + cfg: &configapi.Configuration{ + Integrations: &configapi.Integrations{ + Frameworks: []string{"batch/job"}, + ExternalFrameworks: []string{"invalid"}, + }, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "integrations.externalFrameworks[0]", + }, + }, + }, + "duplicate integrations.externalFrameworks": { + cfg: &configapi.Configuration{ + Integrations: &configapi.Integrations{ + Frameworks: []string{"batch/job"}, + ExternalFrameworks: []string{ + "Foo.v1.example.com", + "Foo.v1.example.com", + }, + }, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeDuplicate, + Field: "integrations.externalFrameworks[1]", + }, + }, + }, "nil PodIntegrationOptions": { cfg: &configapi.Configuration{ QueueVisibility: defaultQueueVisibility, @@ -377,7 +470,7 @@ func TestValidate(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - if diff := cmp.Diff(tc.wantErr, validate(tc.cfg), cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail")); diff != "" { + if diff := cmp.Diff(tc.wantErr, validate(tc.cfg, testScheme), cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail")); diff != "" { t.Errorf("Unexpected returned error (-want,+got):\n%s", diff) } }) diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index e28cce418f..5dbcaad7b4 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -73,10 +73,11 @@ type Options struct { WaitForPodsReady bool KubeServerVersion *kubeversion.ServerVersionFetcher // IntegrationOptions key is "$GROUP/$VERSION, Kind=$KIND". - IntegrationOptions map[string]any - EnabledFrameworks sets.Set[string] - ManagerName string - LabelKeysToCopy []string + IntegrationOptions map[string]any + EnabledFrameworks sets.Set[string] + EnabledExternalFrameworks sets.Set[string] + ManagerName string + LabelKeysToCopy []string } // Option configures the reconciler. @@ -134,6 +135,16 @@ func WithEnabledFrameworks(i *configapi.Integrations) Option { } } +// WithEnabledExternalFrameworks adds framework names managed by external controller in the Config API. +func WithEnabledExternalFrameworks(exFrameworks []string) Option { + return func(o *Options) { + if len(exFrameworks) == 0 { + return + } + o.EnabledExternalFrameworks = sets.New(exFrameworks...) + } +} + // WithManagerName adds the kueue's manager name. func WithManagerName(n string) Option { return func(o *Options) { diff --git a/pkg/controller/jobframework/setup.go b/pkg/controller/jobframework/setup.go index 39541ad3c4..044f3dd84b 100644 --- a/pkg/controller/jobframework/setup.go +++ b/pkg/controller/jobframework/setup.go @@ -47,6 +47,11 @@ var ( func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error { options := ProcessOptions(opts...) + for fwkName := range options.EnabledExternalFrameworks { + if err := RegisterExternalJobType(fwkName); err != nil { + return err + } + } return ForEachIntegration(func(name string, cb IntegrationCallbacks) error { logger := log.WithValues("jobFrameworkName", name) fwkNamePrefix := fmt.Sprintf("jobFrameworkName %q", name) diff --git a/pkg/controller/jobframework/setup_test.go b/pkg/controller/jobframework/setup_test.go index 1bf34c9990..46d2d66ce5 100644 --- a/pkg/controller/jobframework/setup_test.go +++ b/pkg/controller/jobframework/setup_test.go @@ -54,6 +54,10 @@ func TestSetupControllers(t *testing.T) { WithEnabledFrameworks(&configapi.Integrations{ Frameworks: []string{"batch/job", "kubeflow.org/mpijob"}, }), + WithEnabledExternalFrameworks([]string{ + "Foo.v1.example.com", + "Bar.v2.example.com", + }), }, mapperGVKs: []schema.GroupVersionKind{ batchv1.SchemeGroupVersion.WithKind("Job"),