Skip to content

Commit

Permalink
Merge pull request #113 from ethpandaops/pk910/add-mev-indexer
Browse files Browse the repository at this point in the history
fix broken mev indexer
  • Loading branch information
pk910 authored Aug 26, 2024
2 parents 8446512 + b4444e9 commit 65baf7d
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 35 deletions.
6 changes: 5 additions & 1 deletion indexer/beacon/finalization.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,12 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R
}

canonicalRoots := make([][]byte, len(canonicalBlocks))
canonicalBlockHashes := make([][]byte, len(canonicalBlocks))
for i, block := range canonicalBlocks {
canonicalRoots[i] = block.Root[:]
if blockIndex := block.GetBlockIndex(); blockIndex != nil {
canonicalBlockHashes[i] = blockIndex.ExecutionHash[:]
}
}

t1dur := time.Since(t1) - t1loading
Expand Down Expand Up @@ -330,7 +334,7 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R
return fmt.Errorf("error persisting sync committee assignments to db: %v", err)
}

if err := db.UpdateMevBlockByEpoch(uint64(epoch), specs.SlotsPerEpoch, canonicalRoots, tx); err != nil {
if err := db.UpdateMevBlockByEpoch(uint64(epoch), specs.SlotsPerEpoch, canonicalBlockHashes, tx); err != nil {
return fmt.Errorf("error while updating mev block proposal state: %v", err)
}

Expand Down
6 changes: 5 additions & 1 deletion indexer/beacon/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func (sync *synchronizer) syncEpoch(syncEpoch phase0.Epoch, client *Client, last
lastSlot := chainState.EpochStartSlot(syncEpoch+2) - 1
canonicalBlocks := []*Block{}
canonicalBlockRoots := [][]byte{}
canonicalBlockHashes := [][]byte{}
nextEpochCanonicalBlocks := []*Block{}

var firstBlock *Block
Expand Down Expand Up @@ -321,6 +322,9 @@ func (sync *synchronizer) syncEpoch(syncEpoch phase0.Epoch, client *Client, last
if chainState.EpochOfSlot(slot) == syncEpoch {
canonicalBlocks = append(canonicalBlocks, sync.cachedBlocks[slot])
canonicalBlockRoots = append(canonicalBlockRoots, sync.cachedBlocks[slot].Root[:])
if blockIndex := sync.cachedBlocks[slot].GetBlockIndex(); blockIndex != nil {
canonicalBlockHashes = append(canonicalBlockHashes, blockIndex.ExecutionHash[:])
}
} else {
nextEpochCanonicalBlocks = append(nextEpochCanonicalBlocks, sync.cachedBlocks[slot])
}
Expand Down Expand Up @@ -388,7 +392,7 @@ func (sync *synchronizer) syncEpoch(syncEpoch phase0.Epoch, client *Client, last
return fmt.Errorf("error persisting sync committee assignments to db: %v", err)
}

if err := db.UpdateMevBlockByEpoch(uint64(syncEpoch), specs.SlotsPerEpoch, canonicalBlockRoots, tx); err != nil {
if err := db.UpdateMevBlockByEpoch(uint64(syncEpoch), specs.SlotsPerEpoch, canonicalBlockHashes, tx); err != nil {
return fmt.Errorf("error while updating mev block proposal state: %v", err)
}

Expand Down
35 changes: 15 additions & 20 deletions indexer/mevrelay/mevindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (mev *MevIndexer) StartUpdater() {
if mev.updaterRunning {
return
}

if utils.Config.MevIndexer.RefreshInterval == 0 {
utils.Config.MevIndexer.RefreshInterval = 10 * time.Minute
}
Expand Down Expand Up @@ -93,7 +94,7 @@ func (mev *MevIndexer) runUpdater() error {
_, finalizedEpoch := mev.beaconIndexer.GetBlockCacheState()
finalizedSlot := phase0.Slot(0)
if finalizedEpoch > 0 {
finalizedSlot = mev.chainState.EpochToSlot(finalizedEpoch) - 1
finalizedSlot = mev.chainState.EpochToSlot(finalizedEpoch)
}
loadedCount := uint64(0)
for {
Expand Down Expand Up @@ -167,25 +168,14 @@ func (mev *MevIndexer) runUpdater() error {
finalizedEpoch, _ := mev.beaconIndexer.GetBlockCacheState()
finalizedSlot := phase0.Slot(0)
if finalizedEpoch > 0 {
finalizedSlot = mev.chainState.EpochToSlot(finalizedEpoch) - 1
finalizedSlot = mev.chainState.EpochToSlot(finalizedEpoch)
}

updatedMevBlocks = []*dbtypes.MevBlock{}
for hash, cachedMevBlock := range mev.mevBlockCache {
if cachedMevBlock.block.SlotNumber < uint64(finalizedSlot) {
proposed := mev.getMevBlockProposedStatus(cachedMevBlock.block, finalizedSlot)
if cachedMevBlock.block.Proposed != proposed {
cachedMevBlock.block.Proposed = proposed
updatedMevBlocks = append(updatedMevBlocks, cachedMevBlock.block)
}

delete(mev.mevBlockCache, hash)
}
}
err = mev.updateMevBlocks(updatedMevBlocks)
if err != nil {
return err
}

mev.lastRefresh = time.Now()
return nil
Expand Down Expand Up @@ -245,14 +235,19 @@ func (mev *MevIndexer) loadMevBlocksFromRelay(relay *types.MevRelayConfig) error
}

// parse blocks
mev.mevBlockCacheMutex.Lock()
defer mev.mevBlockCacheMutex.Unlock()
validatorSet := mev.beaconIndexer.GetCanonicalValidatorSet(nil)
if len(validatorSet) == 0 {
return fmt.Errorf("validator set is empty")
}

validatorMap := map[phase0.BLSPubKey]*v1.Validator{}
for _, validator := range validatorSet {
validatorMap[validator.Validator.PublicKey] = validator
}

mev.mevBlockCacheMutex.Lock()
defer mev.mevBlockCacheMutex.Unlock()

finalizedEpoch, _ := mev.beaconIndexer.GetBlockCacheState()
finalizedSlot := phase0.Slot(0)
if finalizedEpoch > 0 {
Expand Down Expand Up @@ -376,19 +371,19 @@ func (mev *MevIndexer) loadMevBlocksFromRelay(relay *types.MevRelayConfig) error

func (mev *MevIndexer) getMevBlockProposedStatus(mevBlock *dbtypes.MevBlock, finalizedSlot phase0.Slot) uint8 {
proposed := uint8(0)
if mevBlock.SlotNumber > uint64(finalizedSlot) {
if mevBlock.SlotNumber >= uint64(finalizedSlot) {
for _, block := range mev.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(mevBlock.BlockHash)) {
if proposed != 1 && mev.beaconIndexer.IsCanonicalBlock(block, nil) {
if mev.beaconIndexer.IsCanonicalBlock(block, nil) {
proposed = 1
} else {
} else if proposed != 1 {
proposed = 2
}
}
} else {
for _, block := range db.GetSlotsByBlockHash(mevBlock.BlockHash) {
if proposed != 1 && block.Status == dbtypes.Canonical {
if block.Status == dbtypes.Canonical {
proposed = 1
} else {
} else if proposed != 1 {
proposed = 2
}
}
Expand Down
32 changes: 20 additions & 12 deletions services/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ import (
"github.com/ethpandaops/dora/dbtypes"
"github.com/ethpandaops/dora/indexer/beacon"
execindexer "github.com/ethpandaops/dora/indexer/execution"
"github.com/ethpandaops/dora/indexer/mevrelay"
"github.com/ethpandaops/dora/utils"
"github.com/sirupsen/logrus"
)

type ChainService struct {
logger logrus.FieldLogger
consensusPool *consensus.Pool
executionPool *execution.Pool
beaconIndexer *beacon.Indexer
validatorNames *ValidatorNames
started bool
logger logrus.FieldLogger
consensusPool *consensus.Pool
executionPool *execution.Pool
beaconIndexer *beacon.Indexer
validatorNames *ValidatorNames
mevRelayIndexer *mevrelay.MevIndexer
started bool
}

var GlobalBeaconService *ChainService
Expand All @@ -43,14 +45,17 @@ func InitChainService(ctx context.Context, logger logrus.FieldLogger) {
consensusPool := consensus.NewPool(ctx, logger.WithField("service", "cl-pool"))
executionPool := execution.NewPool(ctx, logger.WithField("service", "el-pool"))
beaconIndexer := beacon.NewIndexer(logger.WithField("service", "cl-indexer"), consensusPool)
validatorNames := NewValidatorNames(beaconIndexer, consensusPool.GetChainState())
chainState := consensusPool.GetChainState()
validatorNames := NewValidatorNames(beaconIndexer, chainState)
mevRelayIndexer := mevrelay.NewMevIndexer(logger.WithField("service", "mev-relay"), beaconIndexer, chainState)

GlobalBeaconService = &ChainService{
logger: logger,
consensusPool: consensusPool,
executionPool: executionPool,
beaconIndexer: beaconIndexer,
validatorNames: validatorNames,
logger: logger,
consensusPool: consensusPool,
executionPool: executionPool,
beaconIndexer: beaconIndexer,
validatorNames: validatorNames,
mevRelayIndexer: mevRelayIndexer,
}
}

Expand Down Expand Up @@ -176,6 +181,9 @@ func (cs *ChainService) StartService() error {
// add execution indexers
execindexer.NewDepositIndexer(executionIndexerCtx)

// start MEV relay indexer
cs.mevRelayIndexer.StartUpdater()

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion templates/mev_blocks/mev_blocks.html
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ <h1 class="h4 mb-1 mb-md-0"><i class="fas fa-money-bill mx-2"></i>MEV Blocks</h1
<div class="row">
<div class="col-sm-12 col-md-5 table-metainfo">
<div class="px-2">
<div class="table-meta" role="status" aria-live="polite">Showing voluntary exits from slot {{ .FirstIndex }} to {{ .LastIndex }}</div>
<div class="table-meta" role="status" aria-live="polite">Showing MEV blocks from slot {{ .FirstIndex }} to {{ .LastIndex }}</div>
</div>
</div>
<div class="col-sm-12 col-md-7 table-paging">
Expand Down

0 comments on commit 65baf7d

Please sign in to comment.