Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Dec 5, 2023
1 parent 7c60796 commit a522896
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 25 deletions.
29 changes: 19 additions & 10 deletions pkg/coordinator/tasks/check_consensus_sync_status/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/ethpandaops/minccino/pkg/coordinator/clients"
"github.com/ethpandaops/minccino/pkg/coordinator/clients/consensus/rpc"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/imdario/mergo"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -102,7 +103,23 @@ func (t *Task) processCheck(ctx context.Context) {
failedClients := []string{}

for _, client := range t.ctx.Scheduler.GetCoordinator().ClientPool().GetClientsByNamePatterns(t.config.ClientNamePatterns) {
checkResult := t.processClientCheck(ctx, client)
var checkResult bool

checkLogger := t.logger.WithField("client", client.Config.Name)
syncStatus, err := client.ConsensusClient.GetRPCClient().GetNodeSyncStatus(ctx)

if ctx.Err() != nil {
return
}

if err != nil {
checkLogger.Warnf("errof fetching sync status: %v", err)

checkResult = false
} else {
checkResult = t.processClientCheck(client, syncStatus, checkLogger)
}

if !checkResult {
allResultsPass = false

Expand All @@ -119,15 +136,7 @@ func (t *Task) processCheck(ctx context.Context) {
}
}

func (t *Task) processClientCheck(ctx context.Context, client *clients.PoolClient) bool {
checkLogger := t.logger.WithField("client", client.Config.Name)

syncStatus, err := client.ConsensusClient.GetRPCClient().GetNodeSyncStatus(ctx)
if err != nil {
checkLogger.Warnf("errof fetching sync status: %v", err)
return false
}

func (t *Task) processClientCheck(client *clients.PoolClient, syncStatus *rpc.SyncStatus, checkLogger logrus.FieldLogger) bool {
clientIdx := client.ExecutionClient.GetIndex()
if t.firstHeight[clientIdx] == 0 {
t.firstHeight[clientIdx] = syncStatus.HeadSlot
Expand Down
29 changes: 19 additions & 10 deletions pkg/coordinator/tasks/check_execution_sync_status/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/ethpandaops/minccino/pkg/coordinator/clients"
"github.com/ethpandaops/minccino/pkg/coordinator/clients/execution/rpc"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/imdario/mergo"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -102,7 +103,23 @@ func (t *Task) processCheck(ctx context.Context) {
failedClients := []string{}

for _, client := range t.ctx.Scheduler.GetCoordinator().ClientPool().GetClientsByNamePatterns(t.config.ClientNamePatterns) {
checkResult := t.processClientCheck(ctx, client)
var checkResult bool

checkLogger := t.logger.WithField("client", client.Config.Name)
syncStatus, err := client.ExecutionClient.GetRPCClient().GetNodeSyncing(ctx)

if ctx.Err() != nil {
return
}

if err != nil {
checkLogger.Warnf("errof fetching sync status: %v", err)

checkResult = false
} else {
checkResult = t.processClientCheck(client, syncStatus, checkLogger)
}

if !checkResult {
allResultsPass = false

Expand All @@ -119,15 +136,7 @@ func (t *Task) processCheck(ctx context.Context) {
}
}

func (t *Task) processClientCheck(ctx context.Context, client *clients.PoolClient) bool {
checkLogger := t.logger.WithField("client", client.Config.Name)

syncStatus, err := client.ExecutionClient.GetRPCClient().GetNodeSyncing(ctx)
if err != nil {
checkLogger.Warnf("errof fetching sync status: %v", err)
return false
}

func (t *Task) processClientCheck(client *clients.PoolClient, syncStatus *rpc.SyncStatus, checkLogger logrus.FieldLogger) bool {
currentBlock, _ := client.ExecutionClient.GetLastHead()

clientIdx := client.ExecutionClient.GetIndex()
Expand Down
11 changes: 6 additions & 5 deletions pkg/coordinator/tasks/run_tasks_concurrent/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func (t *Task) Execute(ctx context.Context) error {
failureCount = 0
pendingCount = 0

for _, result := range resultMap {
for _, task := range t.tasks {
result := resultMap[task]
switch result {
case types.TaskResultSuccess:
successCount++
Expand All @@ -181,14 +182,14 @@ func (t *Task) Execute(ctx context.Context) error {
taskComplete = true
}

if failureCount >= failureLimit {
if !taskComplete && failureCount >= failureLimit {
t.logger.Infof("failure limit reached (%v success, %v failure)", successCount, failureCount)
t.ctx.SetResult(types.TaskResultFailure)

taskComplete = true
}

if pendingCount == 0 {
if !taskComplete && pendingCount == 0 {
t.logger.Infof("all child tasks completed (%v success, %v failure)", successCount, failureCount)
t.ctx.SetResult(types.TaskResultFailure)

Expand Down Expand Up @@ -228,12 +229,12 @@ func (t *Task) watchChildTask(_ context.Context, _ context.CancelFunc, task type
taskActive = false
}

t.logger.Debugf("result update notification for task %v (%v -> %v)", t.taskIdxMap[task], oldStatus, taskStatus.Result)

if taskStatus.Result == oldStatus {
continue
}

t.logger.Debugf("result update notification for task %v (%v -> %v)", t.taskIdxMap[task], oldStatus, taskStatus.Result)

t.resultNotifyChan <- taskResultUpdate{
task: task,
result: taskStatus.Result,
Expand Down
1 change: 1 addition & 0 deletions pkg/coordinator/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func CreateTest(coordinator types.Coordinator, config *Config) (types.Test, erro
log: coordinator.Logger().WithField("component", "test").WithField("test", config.Name),
config: config,
metrics: NewMetrics("sync_test_coordinator", config.Name),
status: types.TestStatusPending,
}
if test.config.Timeout.Duration > 0 {
test.timeout = test.config.Timeout.Duration
Expand Down

0 comments on commit a522896

Please sign in to comment.