From c3e395074a86c86c3747de4d54b4e246c2954f27 Mon Sep 17 00:00:00 2001 From: pk910 Date: Mon, 5 Feb 2024 18:43:48 +0100 Subject: [PATCH] fixes --- pkg/coordinator/tasks/run_task_matrix/task.go | 34 +++++++++------- .../tasks/run_task_options/README.md | 4 ++ .../tasks/run_task_options/config.go | 1 + .../tasks/run_task_options/task.go | 40 ++++++++++++------- pkg/coordinator/tasks/run_tasks/README.md | 6 ++- pkg/coordinator/tasks/run_tasks/config.go | 4 +- pkg/coordinator/tasks/run_tasks/task.go | 6 ++- 7 files changed, 63 insertions(+), 32 deletions(-) diff --git a/pkg/coordinator/tasks/run_task_matrix/task.go b/pkg/coordinator/tasks/run_task_matrix/task.go index 1463cb0..f70042f 100644 --- a/pkg/coordinator/tasks/run_task_matrix/task.go +++ b/pkg/coordinator/tasks/run_task_matrix/task.go @@ -127,28 +127,34 @@ func (t *Task) Execute(ctx context.Context) error { t.taskCtx = taskCtx - var taskWaitChan chan bool - if !t.config.RunConcurrent { - taskWaitChan = make(chan bool, 1) - } + var currentTaskWaitChan, previousTaskWaitChan chan bool // start child tasks for i := range t.tasks { taskWaitGroup.Add(1) - t.taskIdxMap[t.tasks[i]] = i - - if taskWaitChan != nil { - taskWaitChan <- true + if !t.config.RunConcurrent { + previousTaskWaitChan = currentTaskWaitChan + currentTaskWaitChan = make(chan bool) } - go func(i int) { + t.taskIdxMap[t.tasks[i]] = i + + go func(i int, taskWaitChan, prevTaskWaitChan chan bool) { defer taskWaitGroup.Done() - if taskWaitChan != nil { - defer func() { - <-taskWaitChan - }() + if !t.config.RunConcurrent { + if prevTaskWaitChan != nil { + // wait for previous task to be executed + select { + case <-prevTaskWaitChan: + case <-ctx.Done(): + return + } + } + + // allow next task to run once this finishes + defer close(taskWaitChan) } task := t.tasks[i] @@ -161,7 +167,7 @@ func (t *Task) Execute(ctx context.Context) error { //nolint:errcheck // ignore t.ctx.Scheduler.ExecuteTask(taskCtx, task, t.watchChildTask) - }(i) + }(i, currentTaskWaitChan, previousTaskWaitChan) } // watch result updates diff --git a/pkg/coordinator/tasks/run_task_options/README.md b/pkg/coordinator/tasks/run_task_options/README.md index 4753bba..b9b1a68 100644 --- a/pkg/coordinator/tasks/run_task_options/README.md +++ b/pkg/coordinator/tasks/run_task_options/README.md @@ -8,6 +8,9 @@ The `run_task_options` task is designed to execute a single task with configurab - **`task`**:\ The task to be executed. This is defined following the standard task definition format. +- **`propagateResult`**:\ + This setting controls how the result of the child task influences the result of the `run_task_options` task. If set to `true`, any change in the result of the child task (success or failure) is immediately reflected in the result of the parent `run_task_options` task. If `false`, the child task's result is only propagated to the parent task after the child task has completed its execution. + - **`exitOnResult`**:\ If set to `true`, the task will cancel the child task as soon as it sets a result, whether it is "success" or "failure." This option is useful for scenarios where immediate response to the child task's result is necessary. @@ -37,6 +40,7 @@ Default settings for the `run_task_options` task: - name: run_task_options config: task: null + propagateResult: false exitOnResult: false invertResult: false expectFailure: false diff --git a/pkg/coordinator/tasks/run_task_options/config.go b/pkg/coordinator/tasks/run_task_options/config.go index c272ceb..814aba1 100644 --- a/pkg/coordinator/tasks/run_task_options/config.go +++ b/pkg/coordinator/tasks/run_task_options/config.go @@ -8,6 +8,7 @@ import ( type Config struct { Task *helper.RawMessage `yaml:"task" json:"tasks"` + PropagateResult bool `yaml:"propagateResult" json:"propagateResult"` ExitOnResult bool `yaml:"exitOnResult" json:"exitOnResult"` InvertResult bool `yaml:"invertResult" json:"invertResult"` ExpectFailure bool `yaml:"expectFailure" json:"expectFailure"` diff --git a/pkg/coordinator/tasks/run_task_options/task.go b/pkg/coordinator/tasks/run_task_options/task.go index 1032bcf..5ec24b2 100644 --- a/pkg/coordinator/tasks/run_task_options/task.go +++ b/pkg/coordinator/tasks/run_task_options/task.go @@ -20,11 +20,12 @@ var ( ) type Task struct { - ctx *types.TaskContext - options *types.TaskOptions - config Config - logger logrus.FieldLogger - task types.Task + ctx *types.TaskContext + options *types.TaskOptions + config Config + logger logrus.FieldLogger + task types.Task + taskResult types.TaskResult } func NewTask(ctx *types.TaskContext, options *types.TaskOptions) (types.Task, error) { @@ -86,6 +87,8 @@ func (t *Task) LoadConfig() error { } func (t *Task) Execute(ctx context.Context) error { + var taskErr error + retryCount := uint(0) for { @@ -106,37 +109,41 @@ func (t *Task) Execute(ctx context.Context) error { } // execute task - err = t.ctx.Scheduler.ExecuteTask(ctx, t.task, func(ctx context.Context, cancelFn context.CancelFunc, _ types.Task) { + taskErr = t.ctx.Scheduler.ExecuteTask(ctx, t.task, func(ctx context.Context, cancelFn context.CancelFunc, _ types.Task) { t.watchTaskResult(ctx, cancelFn) }) switch { case t.config.RetryOnFailure && retryCount < t.config.MaxRetryCount: - if err != nil { + if taskErr != nil { retryCount++ - t.logger.Warnf("child task failed: %w (retrying)", err) + t.logger.Warnf("child task failed: %w (retrying)", taskErr) continue } case t.config.ExpectFailure: - if err == nil { + if taskErr == nil { return fmt.Errorf("child task succeeded, but should have failed") } case t.config.IgnoreFailure: - if err != nil { - t.logger.Warnf("child task failed: %w", err) + if taskErr != nil { + t.logger.Warnf("child task failed: %w", taskErr) } default: - if err != nil { - return fmt.Errorf("child task failed: %w", err) + if taskErr != nil { + return fmt.Errorf("child task failed: %w", taskErr) } } break } - return nil + if !t.config.PropagateResult && t.taskResult != types.TaskResultNone { + t.ctx.SetResult(t.taskResult) + } + + return taskErr } func (t *Task) watchTaskResult(ctx context.Context, cancelFn context.CancelFunc) { @@ -173,7 +180,10 @@ func (t *Task) watchTaskResult(ctx context.Context, cancelFn context.CancelFunc) } } - t.ctx.SetResult(taskResult) + t.taskResult = taskResult + if t.config.PropagateResult { + t.ctx.SetResult(taskResult) + } if t.config.ExitOnResult { cancelFn() diff --git a/pkg/coordinator/tasks/run_tasks/README.md b/pkg/coordinator/tasks/run_tasks/README.md index 428fe14..3b9decd 100644 --- a/pkg/coordinator/tasks/run_tasks/README.md +++ b/pkg/coordinator/tasks/run_tasks/README.md @@ -1,7 +1,7 @@ ## `run_tasks` Task ### Description -The `run_tasks` task is designed for executing a series of tasks sequentially, ensuring each task is completed before starting the next. This setup is essential for tests requiring a specific order of task execution. +The `run_tasks` task executes a series of specified tasks sequentially. This is particularly useful for scenarios where tasks need to be performed in a specific order, with the outcome of one potentially affecting the subsequent ones. #### Task Behavior - The task starts the child tasks one after the other in the order they are listed. @@ -15,6 +15,9 @@ An important aspect of this task is that it cancels tasks once they return a res - **`tasks`**:\ An array of tasks to be executed one after the other. Each task is defined according to the standard task structure. +- **`stopChildOnResult`**:\ + If set to `true`, each child task in the sequence is stopped as soon as it sets a result (either "success" or "failure"). This ensures that once a task has reached a outcome, it does not continue to run unnecessarily, allowing the next task in the sequence to commence. + - **`expectFailure`**:\ If set to `true`, this option expects each task in the sequence to fail. The task sequence stops with a "failure" result if any task does not fail as expected. @@ -29,6 +32,7 @@ Default settings for the `run_tasks` task: - name: run_tasks config: tasks: [] + stopChildOnResult: true expectFailure: false continueOnFailure: false ``` diff --git a/pkg/coordinator/tasks/run_tasks/config.go b/pkg/coordinator/tasks/run_tasks/config.go index 7cb6bfe..f594d74 100644 --- a/pkg/coordinator/tasks/run_tasks/config.go +++ b/pkg/coordinator/tasks/run_tasks/config.go @@ -8,13 +8,15 @@ import ( type Config struct { Tasks []helper.RawMessage `yaml:"tasks" json:"tasks"` + StopChildOnResult bool `yaml:"stopChildOnResult" json:"stopChildOnResult"` ExpectFailure bool `yaml:"expectFailure" json:"expectFailure"` ContinueOnFailure bool `yaml:"continueOnFailure" json:"continueOnFailure"` } func DefaultConfig() Config { return Config{ - Tasks: []helper.RawMessage{}, + Tasks: []helper.RawMessage{}, + StopChildOnResult: true, } } diff --git a/pkg/coordinator/tasks/run_tasks/task.go b/pkg/coordinator/tasks/run_tasks/task.go index edbf65c..77a393e 100644 --- a/pkg/coordinator/tasks/run_tasks/task.go +++ b/pkg/coordinator/tasks/run_tasks/task.go @@ -105,7 +105,11 @@ func (t *Task) LoadConfig() error { func (t *Task) Execute(ctx context.Context) error { for i, task := range t.tasks { - err := t.ctx.Scheduler.ExecuteTask(ctx, task, t.ctx.Scheduler.WatchTaskPass) + err := t.ctx.Scheduler.ExecuteTask(ctx, task, func(ctx context.Context, cancelFn context.CancelFunc, task types.Task) { + if t.config.StopChildOnResult { + t.ctx.Scheduler.WatchTaskPass(ctx, cancelFn, task) + } + }) switch { case t.config.ExpectFailure: