Skip to content

Commit

Permalink
added check_consensus_forks task
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Jan 3, 2024
1 parent bd5b25e commit e69fd66
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pkg/coordinator/tasks/check_consensus_forks/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package checkconsensusforks

type Config struct {
MinCheckEpochCount uint64 `yaml:"minCheckEpochCount" json:"minCheckEpochCount"`
MaxForkDistance uint64 `yaml:"maxForkDistance" json:"maxForkDistance"`
MaxForkCount uint64 `yaml:"maxForkCount" json:"maxForkCount"`
}

func DefaultConfig() Config {
return Config{
MinCheckEpochCount: 1,
MaxForkDistance: 1,
}
}

func (c *Config) Validate() error {
return nil
}
139 changes: 139 additions & 0 deletions pkg/coordinator/tasks/check_consensus_forks/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package checkconsensusforks

import (
"context"
"fmt"
"time"

"github.com/ethpandaops/assertoor/pkg/coordinator/types"
"github.com/sirupsen/logrus"
)

var (
TaskName = "check_consensus_forks"
TaskDescriptor = &types.TaskDescriptor{
Name: TaskName,
Description: "Check for consensus layer forks.",
Config: DefaultConfig(),
NewTask: NewTask,
}
)

type Task struct {
ctx *types.TaskContext
options *types.TaskOptions
config Config
logger logrus.FieldLogger
startEpoch uint64
}

func NewTask(ctx *types.TaskContext, options *types.TaskOptions) (types.Task, error) {
return &Task{
ctx: ctx,
options: options,
logger: ctx.Logger.GetLogger(),
}, nil
}

func (t *Task) Name() string {
return TaskName
}

func (t *Task) Description() string {
return TaskDescriptor.Description
}

func (t *Task) Title() string {
return t.ctx.Vars.ResolvePlaceholders(t.options.Title)
}

func (t *Task) Config() interface{} {
return t.config
}

func (t *Task) Logger() logrus.FieldLogger {
return t.logger
}

func (t *Task) Timeout() time.Duration {
return t.options.Timeout.Duration
}

func (t *Task) LoadConfig() error {
config := DefaultConfig()

// parse static config
if t.options.Config != nil {
if err := t.options.Config.Unmarshal(&config); err != nil {
return fmt.Errorf("error parsing task config for %v: %w", TaskName, err)
}
}

// load dynamic vars
err := t.ctx.Vars.ConsumeVars(&config, t.options.ConfigVars)
if err != nil {
return err
}

// validate config
if err := config.Validate(); err != nil {
return err
}

t.config = config

return nil
}

func (t *Task) Execute(ctx context.Context) error {
consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool()
blockSubscription := consensusPool.GetBlockCache().SubscribeBlockEvent(10)

defer blockSubscription.Unsubscribe()

_, currentEpoch, err := consensusPool.GetBlockCache().GetWallclock().Now()
if err != nil {
return fmt.Errorf("failed fetching wallclock: %w", err)
}

t.startEpoch = currentEpoch.Number()

for {
select {
case <-blockSubscription.Channel():
t.ctx.SetResult(t.runCheck())
case <-ctx.Done():
return ctx.Err()
}
}
}

func (t *Task) runCheck() types.TaskResult {
consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool()

headForks := consensusPool.GetHeadForks(int64(t.config.MaxForkDistance))
if len(headForks)-1 > int(t.config.MaxForkCount) {
t.logger.Warnf("check failed: too many forks. (have: %v, want <= %v)", len(headForks)-1, t.config.MaxForkCount)

for idx, fork := range headForks {
t.logger.Infof("Fork #%v: %v [0x%x] (%v clients)", idx, fork.Slot, fork.Root, len(fork.AllClients))
}

return types.TaskResultFailure
}

_, currentEpoch, err := consensusPool.GetBlockCache().GetWallclock().Now()
if err != nil {
t.logger.Warnf("check missed: could not get current epoch from wall clock")
return types.TaskResultNone
}

epochCount := currentEpoch.Number() - t.startEpoch

if t.config.MinCheckEpochCount > 0 && epochCount < t.config.MinCheckEpochCount {
t.logger.Warnf("Check missed: checked %v epochs, but need >= %v", epochCount, t.config.MinCheckEpochCount)
return types.TaskResultNone
}

return types.TaskResultSuccess
}
2 changes: 2 additions & 0 deletions pkg/coordinator/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
checkconsensusattestationstats "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_attestation_stats"
checkconsensusblockproposals "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_block_proposals"
checkconsensusfinality "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_finality"
checkconsensusforks "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_forks"
checkconsensusproposerduty "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_proposer_duty"
checkconsensusreorgs "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_reorgs"
checkconsensussyncstatus "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_sync_status"
Expand All @@ -33,6 +34,7 @@ var AvailableTaskDescriptors = []*types.TaskDescriptor{
checkconsensusattestationstats.TaskDescriptor,
checkconsensusblockproposals.TaskDescriptor,
checkconsensusfinality.TaskDescriptor,
checkconsensusforks.TaskDescriptor,
checkconsensusproposerduty.TaskDescriptor,
checkconsensusreorgs.TaskDescriptor,
checkconsensussyncstatus.TaskDescriptor,
Expand Down

0 comments on commit e69fd66

Please sign in to comment.