Skip to content

Commit

Permalink
Reorganize validations for the frameworks and the externalFrameworks (#…
Browse files Browse the repository at this point in the history
…2130)

Signed-off-by: Yuki Iwai <[email protected]>
  • Loading branch information
tenzen-y authored May 8, 2024
1 parent 54d1c01 commit 156529d
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 240 deletions.
39 changes: 1 addition & 38 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,14 @@ import (
corev1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
utilfeature "k8s.io/apiserver/pkg/util/feature"
autoscaling "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

Expand Down Expand Up @@ -271,6 +267,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
jobframework.WithKubeServerVersion(serverVersionFetcher),
jobframework.WithIntegrationOptions(corev1.SchemeGroupVersion.WithKind("Pod").String(), cfg.Integrations.PodOptions),
jobframework.WithEnabledFrameworks(cfg.Integrations),
jobframework.WithEnabledExternalFrameworks(cfg.Integrations.ExternalFrameworks),
jobframework.WithManagerName(constants.KueueName),
jobframework.WithLabelKeysToCopy(cfg.Integrations.LabelKeysToCopy),
}
Expand Down Expand Up @@ -364,44 +361,10 @@ func apply(configFile string) (ctrl.Options, configapi.Configuration, error) {
if err != nil {
return options, cfg, err
}

if cfg.Integrations != nil {
var errorlist field.ErrorList
managedKinds := make(sets.Set[string])
availableFrameworks := jobframework.GetIntegrationsList()
path := field.NewPath("integrations", "frameworks")
for _, framework := range cfg.Integrations.Frameworks {
if cb, found := jobframework.GetIntegration(framework); !found {
errorlist = append(errorlist, field.NotSupported(path, framework, availableFrameworks))
} else {
if gvk, err := apiutil.GVKForObject(cb.JobType, scheme); err == nil {
managedKinds = managedKinds.Insert(gvk.String())
}
}
}

path = field.NewPath("integrations", "externalFrameworks")
for idx, name := range cfg.Integrations.ExternalFrameworks {
if err := jobframework.RegisterExternalJobType(name); err == nil {
gvk, _ := schema.ParseKindArg(name)
if managedKinds.Has(gvk.String()) {
errorlist = append(errorlist, field.Duplicate(path.Index(idx), name))
}
managedKinds = managedKinds.Insert(gvk.String())
}
}

if len(errorlist) > 0 {
err := errorlist.ToAggregate()
return options, cfg, err
}
}

cfgStr, err := config.Encode(scheme, &cfg)
if err != nil {
return options, cfg, err
}
setupLog.Info("Successfully loaded configuration", "config", cfgStr)

return options, cfg, nil
}
169 changes: 0 additions & 169 deletions cmd/kueue/main_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co
return options, cfg, err
}
}
if err := validate(&cfg).ToAggregate(); err != nil {
if err := validate(&cfg, scheme).ToAggregate(); err != nil {
return options, cfg, err
}
addTo(&options, &cfg)
Expand Down
26 changes: 17 additions & 9 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
_ "sigs.k8s.io/kueue/pkg/controller/jobs"
"sigs.k8s.io/kueue/pkg/controller/jobs/job"
)

func TestLoad(t *testing.T) {
test_scheme := runtime.NewScheme()
err := configapi.AddToScheme(test_scheme)
testScheme := runtime.NewScheme()
err := configapi.AddToScheme(testScheme)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -213,16 +214,20 @@ clientConnection:
`), os.FileMode(0600)); err != nil {
t.Fatal(err)
}

integrationsConfig := filepath.Join(tmpDir, "integrations.yaml")
if err := os.WriteFile(integrationsConfig, []byte(`
apiVersion: config.kueue.x-k8s.io/v1beta1
kind: Configuration
integrations:
frameworks:
- batch/job
externalFrameworks:
- Foo.v1.example.com
`), os.FileMode(0600)); err != nil {
t.Fatal(err)
}

queueVisibilityConfig := filepath.Join(tmpDir, "queueVisibility.yaml")
if err := os.WriteFile(queueVisibilityConfig, []byte(`
apiVersion: config.kueue.x-k8s.io/v1beta1
Expand All @@ -234,6 +239,7 @@ queueVisibility:
`), os.FileMode(0600)); err != nil {
t.Fatal(err)
}

podIntegrationOptionsConfig := filepath.Join(tmpDir, "podIntegrationOptions.yaml")
if err := os.WriteFile(podIntegrationOptionsConfig, []byte(`
apiVersion: config.kueue.x-k8s.io/v1beta1
Expand Down Expand Up @@ -268,6 +274,7 @@ multiKueue:
`), os.FileMode(0600)); err != nil {
t.Fatal(err)
}

defaultControlOptions := ctrl.Options{
HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress,
Metrics: metricsserver.Options{
Expand Down Expand Up @@ -656,7 +663,8 @@ multiKueue:
Integrations: &configapi.Integrations{
// referencing job.FrameworkName ensures the link of job package
// therefore the batch/framework should be registered
Frameworks: []string{job.FrameworkName},
Frameworks: []string{job.FrameworkName},
ExternalFrameworks: []string{"Foo.v1.example.com"},
PodOptions: &configapi.PodIntegrationOptions{
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
Expand Down Expand Up @@ -814,7 +822,7 @@ multiKueue:

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
options, cfg, err := Load(test_scheme, tc.configFile)
options, cfg, err := Load(testScheme, tc.configFile)
if tc.wantError == nil {
if err != nil {
t.Errorf("Unexpected error:%s", err)
Expand All @@ -835,14 +843,14 @@ multiKueue:
}

func TestEncode(t *testing.T) {
test_scheme := runtime.NewScheme()
err := configapi.AddToScheme(test_scheme)
testScheme := runtime.NewScheme()
err := configapi.AddToScheme(testScheme)
if err != nil {
t.Fatal(err)
}

defaultConfig := &configapi.Configuration{}
test_scheme.Default(defaultConfig)
testScheme.Default(defaultConfig)

testcases := []struct {
name string
Expand All @@ -853,7 +861,7 @@ func TestEncode(t *testing.T) {

{
name: "empty",
scheme: test_scheme,
scheme: testScheme,
cfg: &configapi.Configuration{},
wantResult: map[string]any{
"apiVersion": "config.kueue.x-k8s.io/v1beta1",
Expand All @@ -866,7 +874,7 @@ func TestEncode(t *testing.T) {
},
{
name: "default",
scheme: test_scheme,
scheme: testScheme,
cfg: defaultConfig,
wantResult: map[string]any{
"apiVersion": "config.kueue.x-k8s.io/v1beta1",
Expand Down
Loading

0 comments on commit 156529d

Please sign in to comment.