Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Feb 5, 2024
1 parent 2a891fb commit c3e3950
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 32 deletions.
34 changes: 20 additions & 14 deletions pkg/coordinator/tasks/run_task_matrix/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,28 +127,34 @@ func (t *Task) Execute(ctx context.Context) error {

t.taskCtx = taskCtx

var taskWaitChan chan bool
if !t.config.RunConcurrent {
taskWaitChan = make(chan bool, 1)
}
var currentTaskWaitChan, previousTaskWaitChan chan bool

// start child tasks
for i := range t.tasks {
taskWaitGroup.Add(1)

t.taskIdxMap[t.tasks[i]] = i

if taskWaitChan != nil {
taskWaitChan <- true
if !t.config.RunConcurrent {
previousTaskWaitChan = currentTaskWaitChan
currentTaskWaitChan = make(chan bool)
}

go func(i int) {
t.taskIdxMap[t.tasks[i]] = i

go func(i int, taskWaitChan, prevTaskWaitChan chan bool) {
defer taskWaitGroup.Done()

if taskWaitChan != nil {
defer func() {
<-taskWaitChan
}()
if !t.config.RunConcurrent {
if prevTaskWaitChan != nil {
// wait for previous task to be executed
select {
case <-prevTaskWaitChan:
case <-ctx.Done():
return
}
}

// allow next task to run once this finishes
defer close(taskWaitChan)
}

task := t.tasks[i]
Expand All @@ -161,7 +167,7 @@ func (t *Task) Execute(ctx context.Context) error {

//nolint:errcheck // ignore
t.ctx.Scheduler.ExecuteTask(taskCtx, task, t.watchChildTask)
}(i)
}(i, currentTaskWaitChan, previousTaskWaitChan)
}

// watch result updates
Expand Down
4 changes: 4 additions & 0 deletions pkg/coordinator/tasks/run_task_options/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ The `run_task_options` task is designed to execute a single task with configurab
- **`task`**:\
The task to be executed. This is defined following the standard task definition format.

- **`propagateResult`**:\
This setting controls how the result of the child task influences the result of the `run_task_options` task. If set to `true`, any change in the result of the child task (success or failure) is immediately reflected in the result of the parent `run_task_options` task. If `false`, the child task's result is only propagated to the parent task after the child task has completed its execution.

- **`exitOnResult`**:\
If set to `true`, the task will cancel the child task as soon as it sets a result, whether it is "success" or "failure." This option is useful for scenarios where immediate response to the child task's result is necessary.

Expand Down Expand Up @@ -37,6 +40,7 @@ Default settings for the `run_task_options` task:
- name: run_task_options
config:
task: null
propagateResult: false
exitOnResult: false
invertResult: false
expectFailure: false
Expand Down
1 change: 1 addition & 0 deletions pkg/coordinator/tasks/run_task_options/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

type Config struct {
Task *helper.RawMessage `yaml:"task" json:"tasks"`
PropagateResult bool `yaml:"propagateResult" json:"propagateResult"`
ExitOnResult bool `yaml:"exitOnResult" json:"exitOnResult"`
InvertResult bool `yaml:"invertResult" json:"invertResult"`
ExpectFailure bool `yaml:"expectFailure" json:"expectFailure"`
Expand Down
40 changes: 25 additions & 15 deletions pkg/coordinator/tasks/run_task_options/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ var (
)

type Task struct {
ctx *types.TaskContext
options *types.TaskOptions
config Config
logger logrus.FieldLogger
task types.Task
ctx *types.TaskContext
options *types.TaskOptions
config Config
logger logrus.FieldLogger
task types.Task
taskResult types.TaskResult
}

func NewTask(ctx *types.TaskContext, options *types.TaskOptions) (types.Task, error) {
Expand Down Expand Up @@ -86,6 +87,8 @@ func (t *Task) LoadConfig() error {
}

func (t *Task) Execute(ctx context.Context) error {
var taskErr error

retryCount := uint(0)

for {
Expand All @@ -106,37 +109,41 @@ func (t *Task) Execute(ctx context.Context) error {
}

// execute task
err = t.ctx.Scheduler.ExecuteTask(ctx, t.task, func(ctx context.Context, cancelFn context.CancelFunc, _ types.Task) {
taskErr = t.ctx.Scheduler.ExecuteTask(ctx, t.task, func(ctx context.Context, cancelFn context.CancelFunc, _ types.Task) {
t.watchTaskResult(ctx, cancelFn)
})

switch {
case t.config.RetryOnFailure && retryCount < t.config.MaxRetryCount:
if err != nil {
if taskErr != nil {
retryCount++

t.logger.Warnf("child task failed: %w (retrying)", err)
t.logger.Warnf("child task failed: %w (retrying)", taskErr)

continue
}
case t.config.ExpectFailure:
if err == nil {
if taskErr == nil {
return fmt.Errorf("child task succeeded, but should have failed")
}
case t.config.IgnoreFailure:
if err != nil {
t.logger.Warnf("child task failed: %w", err)
if taskErr != nil {
t.logger.Warnf("child task failed: %w", taskErr)
}
default:
if err != nil {
return fmt.Errorf("child task failed: %w", err)
if taskErr != nil {
return fmt.Errorf("child task failed: %w", taskErr)
}
}

break
}

return nil
if !t.config.PropagateResult && t.taskResult != types.TaskResultNone {
t.ctx.SetResult(t.taskResult)
}

return taskErr
}

func (t *Task) watchTaskResult(ctx context.Context, cancelFn context.CancelFunc) {
Expand Down Expand Up @@ -173,7 +180,10 @@ func (t *Task) watchTaskResult(ctx context.Context, cancelFn context.CancelFunc)
}
}

t.ctx.SetResult(taskResult)
t.taskResult = taskResult
if t.config.PropagateResult {
t.ctx.SetResult(taskResult)
}

if t.config.ExitOnResult {
cancelFn()
Expand Down
6 changes: 5 additions & 1 deletion pkg/coordinator/tasks/run_tasks/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## `run_tasks` Task

### Description
The `run_tasks` task is designed for executing a series of tasks sequentially, ensuring each task is completed before starting the next. This setup is essential for tests requiring a specific order of task execution.
The `run_tasks` task executes a series of specified tasks sequentially. This is particularly useful for scenarios where tasks need to be performed in a specific order, with the outcome of one potentially affecting the subsequent ones.

#### Task Behavior
- The task starts the child tasks one after the other in the order they are listed.
Expand All @@ -15,6 +15,9 @@ An important aspect of this task is that it cancels tasks once they return a res
- **`tasks`**:\
An array of tasks to be executed one after the other. Each task is defined according to the standard task structure.

- **`stopChildOnResult`**:\
If set to `true`, each child task in the sequence is stopped as soon as it sets a result (either "success" or "failure"). This ensures that once a task has reached a outcome, it does not continue to run unnecessarily, allowing the next task in the sequence to commence.

- **`expectFailure`**:\
If set to `true`, this option expects each task in the sequence to fail. The task sequence stops with a "failure" result if any task does not fail as expected.

Expand All @@ -29,6 +32,7 @@ Default settings for the `run_tasks` task:
- name: run_tasks
config:
tasks: []
stopChildOnResult: true
expectFailure: false
continueOnFailure: false
```
4 changes: 3 additions & 1 deletion pkg/coordinator/tasks/run_tasks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (

type Config struct {
Tasks []helper.RawMessage `yaml:"tasks" json:"tasks"`
StopChildOnResult bool `yaml:"stopChildOnResult" json:"stopChildOnResult"`
ExpectFailure bool `yaml:"expectFailure" json:"expectFailure"`
ContinueOnFailure bool `yaml:"continueOnFailure" json:"continueOnFailure"`
}

func DefaultConfig() Config {
return Config{
Tasks: []helper.RawMessage{},
Tasks: []helper.RawMessage{},
StopChildOnResult: true,
}
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/coordinator/tasks/run_tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ func (t *Task) LoadConfig() error {

func (t *Task) Execute(ctx context.Context) error {
for i, task := range t.tasks {
err := t.ctx.Scheduler.ExecuteTask(ctx, task, t.ctx.Scheduler.WatchTaskPass)
err := t.ctx.Scheduler.ExecuteTask(ctx, task, func(ctx context.Context, cancelFn context.CancelFunc, task types.Task) {
if t.config.StopChildOnResult {
t.ctx.Scheduler.WatchTaskPass(ctx, cancelFn, task)
}
})

switch {
case t.config.ExpectFailure:
Expand Down

0 comments on commit c3e3950

Please sign in to comment.