Skip to content

Commit

Permalink
consistently use central logger instance in all modules
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Jan 5, 2024
1 parent 06188a7 commit 6617766
Show file tree
Hide file tree
Showing 11 changed files with 40 additions and 32 deletions.
14 changes: 8 additions & 6 deletions pkg/coordinator/clients/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

type ClientPool struct {
logger logrus.FieldLogger
consensusPool *consensus.Pool
executionPool *execution.Pool
clients []*PoolClient
Expand All @@ -32,24 +33,25 @@ type ClientConfig struct {
ExecutionHeaders map[string]string `yaml:"executionHeaders"`
}

func NewClientPool() (*ClientPool, error) {
func NewClientPool(logger logrus.FieldLogger) (*ClientPool, error) {
consensusPool, err := consensus.NewPool(&consensus.PoolConfig{
FollowDistance: 10,
ForkDistance: 1,
})
}, logger.WithField("module", "consensus"))
if err != nil {
return nil, fmt.Errorf("could not init consensus pool: %w", err)
}

executionPool, err := execution.NewPool(&execution.PoolConfig{
FollowDistance: 10,
ForkDistance: 1,
})
}, logger.WithField("module", "execution"))
if err != nil {
return nil, fmt.Errorf("could not init execution pool: %w", err)
}

return &ClientPool{
logger: logger.WithField("module", "clients"),
consensusPool: consensusPool,
executionPool: executionPool,
clients: make([]*PoolClient, 0),
Expand Down Expand Up @@ -95,19 +97,19 @@ func (pool *ClientPool) processConsensusBlockNotification(poolClient *PoolClient
for block := range subscription.Channel() {
versionedBlock := block.AwaitBlock(context.Background(), 2*time.Second)
if versionedBlock == nil {
logrus.Warnf("cl/el block notification failed: AwaitBlock timeout (client: %v, slot: %v, root: 0x%x)", poolClient.Config.Name, block.Slot, block.Root)
pool.logger.Warnf("cl/el block notification failed: AwaitBlock timeout (client: %v, slot: %v, root: 0x%x)", poolClient.Config.Name, block.Slot, block.Root)
break
}

hash, err := versionedBlock.ExecutionBlockHash()
if err != nil {
logrus.Warnf("cl/el block notification failed: %s (client: %v, slot: %v, root: 0x%x)", err, poolClient.Config.Name, block.Slot, block.Root)
pool.logger.Warnf("cl/el block notification failed: %s (client: %v, slot: %v, root: 0x%x)", err, poolClient.Config.Name, block.Slot, block.Root)
break
}

number, err := versionedBlock.ExecutionBlockNumber()
if err != nil {
logrus.Warnf("cl/el block notification failed: %s (client: %v, slot: %v, root: 0x%x)", err, poolClient.Config.Name, block.Slot, block.Root)
pool.logger.Warnf("cl/el block notification failed: %s (client: %v, slot: %v, root: 0x%x)", err, poolClient.Config.Name, block.Slot, block.Root)
break
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/coordinator/clients/consensus/blockcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type BlockCache struct {
wallclockSlotDispatcher Dispatcher[*ethwallclock.Slot]
}

func NewBlockCache(followDistance uint64) (*BlockCache, error) {
func NewBlockCache(logger logrus.FieldLogger, followDistance uint64) (*BlockCache, error) {
if followDistance == 0 {
return nil, fmt.Errorf("cannot initialize block cache without follow distance")
}
Expand All @@ -52,7 +52,7 @@ func NewBlockCache(followDistance uint64) (*BlockCache, error) {
go func() {
defer func() {
if err := recover(); err != nil {
logrus.WithError(err.(error)).Errorf("uncaught panic in BlockCache.runCacheCleanup subroutine: %v, stack: %v", err, string(debug.Stack()))
logger.WithError(err.(error)).Errorf("uncaught panic in BlockCache.runCacheCleanup subroutine: %v, stack: %v", err, string(debug.Stack()))
}
}()
cache.runCacheCleanup()
Expand Down
2 changes: 1 addition & 1 deletion pkg/coordinator/clients/consensus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (pool *Pool) newPoolClient(clientIdx uint16, endpoint *ClientConfig) (*Clie
clientIdx: clientIdx,
endpointConfig: endpoint,
rpcClient: rpcClient,
logger: logrus.WithField("client", endpoint.Name),
logger: pool.logger.WithField("client", endpoint.Name),
}
client.resetContext()

Expand Down
3 changes: 1 addition & 2 deletions pkg/coordinator/clients/consensus/clientlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import (
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/assertoor/pkg/coordinator/clients/consensus/rpc"
"github.com/sirupsen/logrus"
)

func (client *Client) runClientLoop() {
defer func() {
if err := recover(); err != nil {
logrus.WithError(err.(error)).Errorf("uncaught panic in PoolClient.runPoolClientLoop subroutine: %v, stack: %v", err, string(debug.Stack()))
client.logger.WithError(err.(error)).Errorf("uncaught panic in PoolClient.runPoolClientLoop subroutine: %v, stack: %v", err, string(debug.Stack()))
}
}()

Expand Down
8 changes: 6 additions & 2 deletions pkg/coordinator/clients/consensus/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package consensus
import (
"fmt"
"sync"

"github.com/sirupsen/logrus"
)

type SchedulerMode uint8
Expand All @@ -19,6 +21,7 @@ type PoolConfig struct {

type Pool struct {
config *PoolConfig
logger logrus.FieldLogger
clientCounter uint16
clients []*Client
blockCache *BlockCache
Expand All @@ -30,11 +33,12 @@ type Pool struct {
rrLastIndexes map[ClientType]uint16
}

func NewPool(config *PoolConfig) (*Pool, error) {
func NewPool(config *PoolConfig, logger logrus.FieldLogger) (*Pool, error) {
var err error

pool := Pool{
config: config,
logger: logger,
clients: make([]*Client, 0),
forkCache: map[int64][]*HeadFork{},
rrLastIndexes: map[ClientType]uint16{},
Expand All @@ -47,7 +51,7 @@ func NewPool(config *PoolConfig) (*Pool, error) {
return nil, fmt.Errorf("unknown pool schedulerMode: %v", config.SchedulerMode)
}

pool.blockCache, err = NewBlockCache(config.FollowDistance)
pool.blockCache, err = NewBlockCache(logger, config.FollowDistance)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/coordinator/clients/execution/blockcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type BlockCache struct {
blockDispatcher Dispatcher[*Block]
}

func NewBlockCache(followDistance uint64) (*BlockCache, error) {
func NewBlockCache(logger logrus.FieldLogger, followDistance uint64) (*BlockCache, error) {
if followDistance == 0 {
return nil, fmt.Errorf("cannot initialize block cache without follow distance")
}
Expand All @@ -40,7 +40,7 @@ func NewBlockCache(followDistance uint64) (*BlockCache, error) {
go func() {
defer func() {
if err := recover(); err != nil {
logrus.WithError(err.(error)).Errorf("uncaught panic in BlockCache.runCacheCleanup subroutine: %v, stack: %v", err, string(debug.Stack()))
logger.WithError(err.(error)).Errorf("uncaught panic in BlockCache.runCacheCleanup subroutine: %v, stack: %v", err, string(debug.Stack()))
}
}()
cache.runCacheCleanup()
Expand Down
2 changes: 1 addition & 1 deletion pkg/coordinator/clients/execution/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (pool *Pool) newPoolClient(clientIdx uint16, endpoint *ClientConfig) (*Clie
endpointConfig: endpoint,
rpcClient: rpcClient,
updateChan: make(chan *clientBlockNotification, 10),
logger: logrus.WithField("client", endpoint.Name),
logger: pool.logger.WithField("client", endpoint.Name),
}
client.resetContext()

Expand Down
3 changes: 1 addition & 2 deletions pkg/coordinator/clients/execution/clientlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/sirupsen/logrus"
)

func (client *Client) runClientLoop() {
defer func() {
if err := recover(); err != nil {
logrus.WithError(err.(error)).Errorf("uncaught panic in executionClient.runClientLoop subroutine: %v, stack: %v", err, string(debug.Stack()))
client.logger.WithError(err.(error)).Errorf("uncaught panic in executionClient.runClientLoop subroutine: %v, stack: %v", err, string(debug.Stack()))
}
}()

Expand Down
8 changes: 6 additions & 2 deletions pkg/coordinator/clients/execution/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package execution
import (
"fmt"
"sync"

"github.com/sirupsen/logrus"
)

type SchedulerMode uint8
Expand All @@ -19,6 +21,7 @@ type PoolConfig struct {

type Pool struct {
config *PoolConfig
logger logrus.FieldLogger
clientCounter uint16
clients []*Client
blockCache *BlockCache
Expand All @@ -30,9 +33,10 @@ type Pool struct {
rrLastIndexes map[ClientType]uint16
}

func NewPool(config *PoolConfig) (*Pool, error) {
func NewPool(config *PoolConfig, logger logrus.FieldLogger) (*Pool, error) {
pool := Pool{
config: config,
logger: logger,
clients: make([]*Client, 0),
forkCache: map[int64][]*HeadFork{},
rrLastIndexes: map[ClientType]uint16{},
Expand All @@ -47,7 +51,7 @@ func NewPool(config *PoolConfig) (*Pool, error) {
return nil, fmt.Errorf("unknown pool schedulerMode: %v", config.SchedulerMode)
}

pool.blockCache, err = NewBlockCache(config.FollowDistance)
pool.blockCache, err = NewBlockCache(logger, config.FollowDistance)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *Coordinator) Run(ctx context.Context) error {
Info("starting coordinator")

// init client pool
clientPool, err := clients.NewClientPool()
clientPool, err := clients.NewClientPool(c.log)
if err != nil {
return err
}
Expand Down Expand Up @@ -94,7 +94,7 @@ func (c *Coordinator) Run(ctx context.Context) error {
}

// load validator names
c.validatorNames = names.NewValidatorNames(c.Config.ValidatorNames)
c.validatorNames = names.NewValidatorNames(c.Config.ValidatorNames, c.log)
c.validatorNames.LoadValidatorNames()

// load tests
Expand Down
20 changes: 10 additions & 10 deletions pkg/coordinator/names/validatornames.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ import (
"gopkg.in/yaml.v3"
)

var logger = logrus.StandardLogger().WithField("module", "names")

type ValidatorNames struct {
config *Config
logger logrus.FieldLogger
namesMutex sync.RWMutex
names map[uint64]string
}

func NewValidatorNames(config *Config) *ValidatorNames {
func NewValidatorNames(config *Config, logger logrus.FieldLogger) *ValidatorNames {
return &ValidatorNames{
config: config,
logger: logger.WithField("module", "names"),
}
}

Expand All @@ -53,21 +53,21 @@ func (vn *ValidatorNames) LoadValidatorNames() {
if vn.config.InventoryYaml != "" {
err := vn.loadFromYaml(vn.config.InventoryYaml)
if err != nil {
logger.WithError(err).Errorf("error while loading validator names from yaml")
vn.logger.WithError(err).Errorf("error while loading validator names from yaml")
}
}

if vn.config.InventoryURL != "" {
err := vn.loadFromRangesAPI(vn.config.InventoryURL)
if err != nil {
logger.WithError(err).Errorf("error while loading validator names inventory")
vn.logger.WithError(err).Errorf("error while loading validator names inventory")
}
}

if vn.config.Inventory != nil {
nameCount := vn.parseNamesMap(vn.config.Inventory)
if nameCount > 0 {
logger.Infof("loaded %v validator names from config", nameCount)
vn.logger.Infof("loaded %v validator names from config", nameCount)
}
}
}
Expand All @@ -88,7 +88,7 @@ func (vn *ValidatorNames) loadFromYaml(fileName string) error {
}

nameCount := vn.parseNamesMap(namesYaml)
logger.Infof("loaded %v validator names from yaml (%v)", nameCount, fileName)
vn.logger.Infof("loaded %v validator names from yaml (%v)", nameCount, fileName)

return nil
}
Expand Down Expand Up @@ -126,7 +126,7 @@ type validatorNamesRangesResponse struct {
}

func (vn *ValidatorNames) loadFromRangesAPI(apiURL string) error {
logger.Debugf("Loading validator names from inventory: %v", apiURL)
vn.logger.Debugf("Loading validator names from inventory: %v", apiURL)

client := &http.Client{Timeout: time.Second * 120}
resp, err := client.Get(apiURL)
Expand All @@ -139,7 +139,7 @@ func (vn *ValidatorNames) loadFromRangesAPI(apiURL string) error {

if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
logger.Errorf("could not fetch inventory (%v): not found", getRedactedURL(apiURL))
vn.logger.Errorf("could not fetch inventory (%v): not found", getRedactedURL(apiURL))
return nil
}

Expand Down Expand Up @@ -180,7 +180,7 @@ func (vn *ValidatorNames) loadFromRangesAPI(apiURL string) error {
}
}

logger.Infof("loaded %v validator names from inventory api (%v)", nameCount, getRedactedURL(apiURL))
vn.logger.Infof("loaded %v validator names from inventory api (%v)", nameCount, getRedactedURL(apiURL))

return nil
}
Expand Down

0 comments on commit 6617766

Please sign in to comment.