Skip to content

Commit

Permalink
Merge branch 'master' into electra-support
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Aug 13, 2024
2 parents 2a76c2e + 2b99f81 commit d207027
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 14 deletions.
25 changes: 25 additions & 0 deletions pkg/coordinator/scheduler/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,31 @@ func (ts *TaskScheduler) ExecuteTask(ctx context.Context, taskIndex types.TaskIn
taskState.taskStatusVars.SetVar("running", false)
}()

// check task condition if defined
if taskState.options.If != "" {
conditionResult, _, err := taskState.taskVars.ResolveQuery(taskState.options.If)
if err != nil {
taskLogger.Errorf("task condition evaluation failed: %v", err)
taskState.setTaskResult(types.TaskResultFailure, false)

return fmt.Errorf("task condition evaluation failed: %w", err)
}

isValid, isOk := conditionResult.(bool)
if !isOk {
taskLogger.Warnf("task condition is not a boolean: %v", conditionResult)
}

if !isValid {
taskLogger.Infof("task condition not met, skipping task")

taskState.isSkipped = true
taskState.setTaskResult(types.TaskResultNone, false)

return nil
}
}

// create task control context
taskCtx := &types.TaskContext{
Scheduler: ts,
Expand Down
2 changes: 2 additions & 0 deletions pkg/coordinator/scheduler/task_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type taskState struct {
isCleanup bool
isStarted bool
isRunning bool
isSkipped bool
isTimeout bool
startTime time.Time
stopTime time.Time
Expand Down Expand Up @@ -149,6 +150,7 @@ func (ts *taskState) GetTaskStatus() *types.TaskStatus {
ParentIndex: 0,
IsStarted: ts.isStarted,
IsRunning: ts.isRunning,
IsSkipped: ts.isSkipped,
StartTime: ts.startTime,
StopTime: ts.stopTime,
Result: ts.taskResult,
Expand Down
21 changes: 14 additions & 7 deletions pkg/coordinator/tasks/run_task_matrix/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ func (t *Task) Execute(ctx context.Context) error {
}(i, currentTaskWaitChan, previousTaskWaitChan)
}

completeChan := make(chan bool)
go func() {
taskWaitGroup.Wait()
time.Sleep(100 * time.Millisecond)
close(completeChan)
}()

// watch result updates
successLimit := t.config.SucceedTaskCount
if successLimit == 0 {
Expand Down Expand Up @@ -215,16 +222,16 @@ func (t *Task) Execute(ctx context.Context) error {
taskComplete = true
}

if !taskComplete && pendingCount == 0 {
t.logger.Infof("all child tasks completed (%v success, %v failure)", successCount, failureCount)
t.ctx.SetResult(types.TaskResultFailure)

taskComplete = true
}

if !taskComplete {
t.logger.Debugf("result update (%v success, %v failure)", successCount, failureCount)
}
case <-completeChan:
if !taskComplete {
taskComplete = true

t.ctx.SetResult(types.TaskResultSuccess)
t.logger.Infof("all child tasks completed (%v success, %v failure)", successCount, failureCount)
}
}
}

Expand Down
21 changes: 14 additions & 7 deletions pkg/coordinator/tasks/run_tasks_concurrent/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ func (t *Task) Execute(ctx context.Context) error {
}(i)
}

completeChan := make(chan bool)
go func() {
taskWaitGroup.Wait()
time.Sleep(100 * time.Millisecond)
close(completeChan)
}()

// watch result updates
successLimit := t.config.SucceedTaskCount
if successLimit == 0 {
Expand Down Expand Up @@ -186,16 +193,16 @@ func (t *Task) Execute(ctx context.Context) error {
taskComplete = true
}

if !taskComplete && pendingCount == 0 {
t.logger.Infof("all child tasks completed (%v success, %v failure)", successCount, failureCount)
t.ctx.SetResult(types.TaskResultFailure)

taskComplete = true
}

if !taskComplete {
t.logger.Debugf("result update (%v success, %v failure)", successCount, failureCount)
}
case <-completeChan:
if !taskComplete {
taskComplete = true

t.ctx.SetResult(types.TaskResultSuccess)
t.logger.Infof("all child tasks completed (%v success, %v failure)", successCount, failureCount)
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/coordinator/types/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type TaskOptions struct {
Timeout helper.Duration `yaml:"timeout" json:"timeout"`
// The optional id of the task (for result access via tasks.<task-id>).
ID string `yaml:"id" json:"id"`
// The optional condition to run the task.
If string `yaml:"if" json:"if"`
}

type TaskIndex uint64
Expand Down Expand Up @@ -68,6 +70,7 @@ type TaskStatus struct {
ParentIndex TaskIndex
IsStarted bool
IsRunning bool
IsSkipped bool
StartTime time.Time
StopTime time.Time
Result TaskResult
Expand Down

0 comments on commit d207027

Please sign in to comment.