Skip to content

Commit

Permalink
add logs api endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Jan 11, 2024
1 parent 0803817 commit de6c41f
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 12 deletions.
32 changes: 20 additions & 12 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ethpandaops/assertoor/pkg/coordinator/buildinfo"
"github.com/ethpandaops/assertoor/pkg/coordinator/clients"
"github.com/ethpandaops/assertoor/pkg/coordinator/logger"
"github.com/ethpandaops/assertoor/pkg/coordinator/names"
"github.com/ethpandaops/assertoor/pkg/coordinator/test"
"github.com/ethpandaops/assertoor/pkg/coordinator/types"
Expand All @@ -25,7 +26,7 @@ import (
type Coordinator struct {
// Config is the coordinator configuration.
Config *Config
log logrus.FieldLogger
log *logger.LogScope
clientPool *clients.ClientPool
walletManager *wallet.Manager
webserver *server.WebServer
Expand All @@ -37,7 +38,10 @@ type Coordinator struct {

func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort, lameDuckSeconds int) *Coordinator {
return &Coordinator{
log: log,
log: logger.NewLogger(&logger.ScopeOptions{
Parent: log,
HistorySize: 5000,
}),
Config: config,
tests: []types.Test{},
metricsPort: metricsPort,
Expand All @@ -47,20 +51,20 @@ func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort, lameDuc

// Run executes the coordinator until completion.
func (c *Coordinator) Run(ctx context.Context) error {
c.log.
c.log.GetLogger().
WithField("build_version", buildinfo.GetVersion()).
WithField("metrics_port", c.metricsPort).
WithField("lame_duck_seconds", c.lameDuckSeconds).
Info("starting coordinator")

// init client pool
clientPool, err := clients.NewClientPool(c.log)
clientPool, err := clients.NewClientPool(c.log.GetLogger())
if err != nil {
return err
}

c.clientPool = clientPool
c.walletManager = wallet.NewManager(clientPool.GetExecutionPool(), c.log.WithField("module", "wallet"))
c.walletManager = wallet.NewManager(clientPool.GetExecutionPool(), c.log.GetLogger().WithField("module", "wallet"))

for idx := range c.Config.Endpoints {
err = clientPool.AddClient(&c.Config.Endpoints[idx])
Expand All @@ -71,7 +75,7 @@ func (c *Coordinator) Run(ctx context.Context) error {

// init webserver
if c.Config.Web != nil && c.Config.Web.Server != nil {
c.webserver, err = server.NewWebServer(c.Config.Web.Server, c.log)
c.webserver, err = server.NewWebServer(c.Config.Web.Server, c.log.GetLogger())
if err != nil {
return err
}
Expand All @@ -94,7 +98,7 @@ func (c *Coordinator) Run(ctx context.Context) error {
}

// load validator names
c.validatorNames = names.NewValidatorNames(c.Config.ValidatorNames, c.log)
c.validatorNames = names.NewValidatorNames(c.Config.ValidatorNames, c.log.GetLogger())
c.validatorNames.LoadValidatorNames()

// load tests
Expand All @@ -103,25 +107,29 @@ func (c *Coordinator) Run(ctx context.Context) error {
return err
}

c.log.Infof("Loaded %v tests", len(c.tests))
c.log.GetLogger().Infof("Loaded %v tests", len(c.tests))

// run tests
c.runTests(ctx)

if c.webserver == nil {
c.log.WithField("seconds", c.lameDuckSeconds).Info("Initiating lame duck")
c.log.GetLogger().WithField("seconds", c.lameDuckSeconds).Info("Initiating lame duck")
time.Sleep(time.Duration(c.lameDuckSeconds) * time.Second)
c.log.Info("lame duck complete")
c.log.GetLogger().Info("lame duck complete")
} else {
<-ctx.Done()
}

c.log.Info("Shutting down..")
c.log.GetLogger().Info("Shutting down..")

return nil
}

func (c *Coordinator) Logger() logrus.FieldLogger {
return c.log.GetLogger()
}

func (c *Coordinator) LogScope() *logger.LogScope {
return c.log
}

Expand Down Expand Up @@ -149,7 +157,7 @@ func (c *Coordinator) GetTests() []types.Test {
}

func (c *Coordinator) startMetrics() error {
c.log.
c.log.GetLogger().
Info(fmt.Sprintf("Starting metrics server on :%v", c.metricsPort))

http.Handle("/metrics", promhttp.Handler())
Expand Down
12 changes: 12 additions & 0 deletions pkg/coordinator/logger/logscope.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,15 @@ func (ls *LogScope) GetLogEntries() []*logrus.Entry {

return entries
}

func (ls *LogScope) GetLogEntriesSince(since int64) []*logrus.Entry {
entries := ls.GetLogEntries()

for idx, entry := range entries {
if entry.Time.UnixNano() > since {
return entries[idx:]
}
}

return []*logrus.Entry{}
}
2 changes: 2 additions & 0 deletions pkg/coordinator/types/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package types

import (
"github.com/ethpandaops/assertoor/pkg/coordinator/clients"
"github.com/ethpandaops/assertoor/pkg/coordinator/logger"
"github.com/ethpandaops/assertoor/pkg/coordinator/names"
"github.com/ethpandaops/assertoor/pkg/coordinator/wallet"
"github.com/sirupsen/logrus"
)

type Coordinator interface {
Logger() logrus.FieldLogger
LogScope() *logger.LogScope
ClientPool() *clients.ClientPool
WalletManager() *wallet.Manager
ValidatorNames() *names.ValidatorNames
Expand Down
73 changes: 73 additions & 0 deletions pkg/coordinator/web/handlers/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package handlers

import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"

"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
)

type Logs struct {
Log []*LogsEntry `json:"log"`
}

type LogsEntry struct {
TIdx int64 `json:"tidx"`
Time time.Time `json:"time"`
Level uint64 `json:"level"`
Message string `json:"msg"`
DataLen uint64 `json:"datalen"`
Data map[string]string `json:"data"`
}

func (fh *FrontendHandler) LogsData(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)

sinceTime, err := strconv.ParseInt(vars["since"], 10, 64)
if err != nil {
fmt.Printf("err: %v", err)
sinceTime = 0
}

pageData := fh.getLogsPageData(sinceTime)

w.Header().Set("Content-Type", "application/json")

err = json.NewEncoder(w).Encode(pageData)
if err != nil {
logrus.WithError(err).Error("error encoding test data")

//nolint:gocritic // ignore
http.Error(w, "Internal server error", http.StatusServiceUnavailable)
}
}

func (fh *FrontendHandler) getLogsPageData(since int64) *Logs {
pageData := &Logs{}

taskLog := fh.coordinator.LogScope().GetLogEntriesSince(since)
pageData.Log = make([]*LogsEntry, len(taskLog))

for i, log := range taskLog {
logData := &LogsEntry{
TIdx: log.Time.UnixNano(),
Time: log.Time,
Level: uint64(log.Level),
Message: log.Message,
Data: map[string]string{},
DataLen: uint64(len(log.Data)),
}

for dataKey, dataVal := range log.Data {
logData.Data[dataKey] = fmt.Sprintf("%v", dataVal)
}

pageData.Log[i] = logData
}

return pageData
}
1 change: 1 addition & 0 deletions pkg/coordinator/web/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (ws *WebServer) StartFrontend(config *types.FrontendConfig, coordinator coo
ws.router.HandleFunc("/", frontendHandler.Index).Methods("GET")
ws.router.HandleFunc("/test/{testIdx}", frontendHandler.Test).Methods("GET")
ws.router.HandleFunc("/clients", frontendHandler.Clients).Methods("GET")
ws.router.HandleFunc("/logs/{since}", frontendHandler.LogsData).Methods("GET")

ws.router.PathPrefix("/").Handler(frontend)
}
Expand Down

0 comments on commit de6c41f

Please sign in to comment.