From e69fd6699405df02ef3b4989861d20e49a626c11 Mon Sep 17 00:00:00 2001 From: pk910 Date: Wed, 3 Jan 2024 22:54:10 +0100 Subject: [PATCH] added `check_consensus_forks` task --- .../tasks/check_consensus_forks/config.go | 18 +++ .../tasks/check_consensus_forks/task.go | 139 ++++++++++++++++++ pkg/coordinator/tasks/tasks.go | 2 + 3 files changed, 159 insertions(+) create mode 100644 pkg/coordinator/tasks/check_consensus_forks/config.go create mode 100644 pkg/coordinator/tasks/check_consensus_forks/task.go diff --git a/pkg/coordinator/tasks/check_consensus_forks/config.go b/pkg/coordinator/tasks/check_consensus_forks/config.go new file mode 100644 index 0000000..fd76ff4 --- /dev/null +++ b/pkg/coordinator/tasks/check_consensus_forks/config.go @@ -0,0 +1,18 @@ +package checkconsensusforks + +type Config struct { + MinCheckEpochCount uint64 `yaml:"minCheckEpochCount" json:"minCheckEpochCount"` + MaxForkDistance uint64 `yaml:"maxForkDistance" json:"maxForkDistance"` + MaxForkCount uint64 `yaml:"maxForkCount" json:"maxForkCount"` +} + +func DefaultConfig() Config { + return Config{ + MinCheckEpochCount: 1, + MaxForkDistance: 1, + } +} + +func (c *Config) Validate() error { + return nil +} diff --git a/pkg/coordinator/tasks/check_consensus_forks/task.go b/pkg/coordinator/tasks/check_consensus_forks/task.go new file mode 100644 index 0000000..c8c813a --- /dev/null +++ b/pkg/coordinator/tasks/check_consensus_forks/task.go @@ -0,0 +1,139 @@ +package checkconsensusforks + +import ( + "context" + "fmt" + "time" + + "github.com/ethpandaops/assertoor/pkg/coordinator/types" + "github.com/sirupsen/logrus" +) + +var ( + TaskName = "check_consensus_forks" + TaskDescriptor = &types.TaskDescriptor{ + Name: TaskName, + Description: "Check for consensus layer forks.", + Config: DefaultConfig(), + NewTask: NewTask, + } +) + +type Task struct { + ctx *types.TaskContext + options *types.TaskOptions + config Config + logger logrus.FieldLogger + startEpoch uint64 +} + +func NewTask(ctx *types.TaskContext, options *types.TaskOptions) (types.Task, error) { + return &Task{ + ctx: ctx, + options: options, + logger: ctx.Logger.GetLogger(), + }, nil +} + +func (t *Task) Name() string { + return TaskName +} + +func (t *Task) Description() string { + return TaskDescriptor.Description +} + +func (t *Task) Title() string { + return t.ctx.Vars.ResolvePlaceholders(t.options.Title) +} + +func (t *Task) Config() interface{} { + return t.config +} + +func (t *Task) Logger() logrus.FieldLogger { + return t.logger +} + +func (t *Task) Timeout() time.Duration { + return t.options.Timeout.Duration +} + +func (t *Task) LoadConfig() error { + config := DefaultConfig() + + // parse static config + if t.options.Config != nil { + if err := t.options.Config.Unmarshal(&config); err != nil { + return fmt.Errorf("error parsing task config for %v: %w", TaskName, err) + } + } + + // load dynamic vars + err := t.ctx.Vars.ConsumeVars(&config, t.options.ConfigVars) + if err != nil { + return err + } + + // validate config + if err := config.Validate(); err != nil { + return err + } + + t.config = config + + return nil +} + +func (t *Task) Execute(ctx context.Context) error { + consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + blockSubscription := consensusPool.GetBlockCache().SubscribeBlockEvent(10) + + defer blockSubscription.Unsubscribe() + + _, currentEpoch, err := consensusPool.GetBlockCache().GetWallclock().Now() + if err != nil { + return fmt.Errorf("failed fetching wallclock: %w", err) + } + + t.startEpoch = currentEpoch.Number() + + for { + select { + case <-blockSubscription.Channel(): + t.ctx.SetResult(t.runCheck()) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (t *Task) runCheck() types.TaskResult { + consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + + headForks := consensusPool.GetHeadForks(int64(t.config.MaxForkDistance)) + if len(headForks)-1 > int(t.config.MaxForkCount) { + t.logger.Warnf("check failed: too many forks. (have: %v, want <= %v)", len(headForks)-1, t.config.MaxForkCount) + + for idx, fork := range headForks { + t.logger.Infof("Fork #%v: %v [0x%x] (%v clients)", idx, fork.Slot, fork.Root, len(fork.AllClients)) + } + + return types.TaskResultFailure + } + + _, currentEpoch, err := consensusPool.GetBlockCache().GetWallclock().Now() + if err != nil { + t.logger.Warnf("check missed: could not get current epoch from wall clock") + return types.TaskResultNone + } + + epochCount := currentEpoch.Number() - t.startEpoch + + if t.config.MinCheckEpochCount > 0 && epochCount < t.config.MinCheckEpochCount { + t.logger.Warnf("Check missed: checked %v epochs, but need >= %v", epochCount, t.config.MinCheckEpochCount) + return types.TaskResultNone + } + + return types.TaskResultSuccess +} diff --git a/pkg/coordinator/tasks/tasks.go b/pkg/coordinator/tasks/tasks.go index fece00c..5e7188a 100644 --- a/pkg/coordinator/tasks/tasks.go +++ b/pkg/coordinator/tasks/tasks.go @@ -7,6 +7,7 @@ import ( checkconsensusattestationstats "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_attestation_stats" checkconsensusblockproposals "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_block_proposals" checkconsensusfinality "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_finality" + checkconsensusforks "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_forks" checkconsensusproposerduty "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_proposer_duty" checkconsensusreorgs "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_reorgs" checkconsensussyncstatus "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_sync_status" @@ -33,6 +34,7 @@ var AvailableTaskDescriptors = []*types.TaskDescriptor{ checkconsensusattestationstats.TaskDescriptor, checkconsensusblockproposals.TaskDescriptor, checkconsensusfinality.TaskDescriptor, + checkconsensusforks.TaskDescriptor, checkconsensusproposerduty.TaskDescriptor, checkconsensusreorgs.TaskDescriptor, checkconsensussyncstatus.TaskDescriptor,