From f8eff2417dcf5df2f45c9982a7018f24b52af6b2 Mon Sep 17 00:00:00 2001 From: Ron Date: Mon, 13 Jul 2020 03:37:23 +0300 Subject: [PATCH 1/3] fixed histograms and added filesystem metrics --- go.mod | 3 + go.sum | 3 + instrumentation/metric/gauge.go | 12 +++ instrumentation/metric/histogram.go | 81 ++++++++++++++++--- .../metric/histogram_prometheus_test.go | 4 +- instrumentation/metric/registry.go | 28 ++++--- .../filesystem/file_system_persistence.go | 37 +++++++-- .../blockstorage/internodesync/factory.go | 47 ++++++----- .../benchmarkconsensus/service.go | 4 +- .../leanhelixconsensus/service.go | 4 +- .../leanhelixconsensus/test/harness.go | 10 +-- services/consensuscontext/service.go | 6 +- .../timestampfinder/timestamp_finder.go | 2 +- .../adapter/tcp/outgoing_connections.go | 4 +- .../native/adapter/native_compiler.go | 12 +-- .../processor/native/compiling_repository.go | 4 +- services/processor/native/service.go | 2 +- services/publicapi/service.go | 6 +- services/transactionpool/pending_pool.go | 4 +- 19 files changed, 190 insertions(+), 83 deletions(-) diff --git a/go.mod b/go.mod index 3afdc650e..e6eff15b4 100644 --- a/go.mod +++ b/go.mod @@ -11,12 +11,15 @@ require ( github.com/c9s/goprocinfo v0.0.0-20190309065803-0b2ad9ac246b github.com/cespare/cp v1.1.1 // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd + github.com/coreos/etcd v3.3.13+incompatible + github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f github.com/deckarep/golang-set v1.7.1 // indirect github.com/edsrzf/mmap-go v1.0.0 // indirect github.com/elastic/gosigar v0.10.4 // indirect github.com/ethereum/go-ethereum v1.9.6 github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect + github.com/go-stack/stack v1.8.0 // indirect github.com/google/go-cmp v0.3.1 github.com/huin/goupnp v1.0.0 // indirect github.com/jackpal/go-nat-pmp v1.0.1 // indirect diff --git a/go.sum b/go.sum index 6888b5b8d..8bb23e49c 100644 --- a/go.sum +++ b/go.sum @@ -39,12 +39,15 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= diff --git a/instrumentation/metric/gauge.go b/instrumentation/metric/gauge.go index 2a7f54756..822d99cd8 100644 --- a/instrumentation/metric/gauge.go +++ b/instrumentation/metric/gauge.go @@ -58,6 +58,18 @@ func (g *Gauge) Update(i int64) { atomic.StoreInt64(&g.value, i) } +func (g *Gauge) UpdateMax(i int64) { + for { // retry indefinitely (?!) if optimistic lock fails + old := g.Value() + if old >= i { + return + } + if atomic.CompareAndSwapInt64(&g.value, old, i) { // optimistic lock + return + } + } +} + func (g *Gauge) UpdateUInt32(i int32) { atomic.StoreInt64(&g.value, int64(i)) } diff --git a/instrumentation/metric/histogram.go b/instrumentation/metric/histogram.go index fd2f21ec7..b2df0f701 100644 --- a/instrumentation/metric/histogram.go +++ b/instrumentation/metric/histogram.go @@ -14,33 +14,27 @@ import ( "time" ) -type Histogram struct { +type HistogramTimeDiff struct { namedMetric histo *hdrhistogram.WindowedHistogram overflowCount int64 } -func newHistogram(name string, max int64, n int) *Histogram { - return &Histogram{ +func newHistogramTimeDiff(name string, max int64, n int) *HistogramTimeDiff { + return &HistogramTimeDiff{ namedMetric: namedMetric{name: name}, histo: hdrhistogram.NewWindowed(n, 0, max, 1), } } -func (h *Histogram) RecordSince(t time.Time) { +func (h *HistogramTimeDiff) RecordSince(t time.Time) { d := time.Since(t).Nanoseconds() if err := h.histo.Current.RecordValue(int64(d)); err != nil { atomic.AddInt64(&h.overflowCount, 1) } } -func (h *Histogram) Record(measurement int64) { - if err := h.histo.Current.RecordValue(measurement); err != nil { - atomic.AddInt64(&h.overflowCount, 1) - } -} - -func (h *Histogram) String() string { +func (h *HistogramTimeDiff) String() string { var errorRate float64 histo := h.histo.Current @@ -63,7 +57,7 @@ func (h *Histogram) String() string { errorRate) } -func (h *Histogram) Export() exportedMetric { +func (h *HistogramTimeDiff) Export() exportedMetric { histo := h.histo.Merge() return &histogramExport{ @@ -78,6 +72,67 @@ func (h *Histogram) Export() exportedMetric { } } -func (h *Histogram) Rotate() { +func (h *HistogramTimeDiff) Rotate() { + h.histo.Rotate() +} + +type HistogramInt64 struct { // TODO DRY HistogramTimeDiff is similar + namedMetric + histo *hdrhistogram.WindowedHistogram + overflowCount int64 +} + +func newHistogramInt64(name string, max int64, n int) *HistogramInt64 { + return &HistogramInt64{ + namedMetric: namedMetric{name: name}, + histo: hdrhistogram.NewWindowed(n, 0, max, 1), + } +} + +func (h *HistogramInt64) Record(measurement int64) { + if err := h.histo.Current.RecordValue(measurement); err != nil { + atomic.AddInt64(&h.overflowCount, 1) + } +} + +func (h *HistogramInt64) String() string { + var errorRate float64 + histo := h.histo.Current + + if h.overflowCount > 0 { + errorRate = float64(histo.TotalCount()) / float64(h.overflowCount) + } else { + errorRate = 0 + } + + return fmt.Sprintf( + "metric %s: [min=%d, p50=%d, p95=%d, p99=%d, max=%d, avg=%f, samples=%d, error rate=%f]\n", + h.name, + histo.Min(), + histo.ValueAtQuantile(50), + histo.ValueAtQuantile(95), + histo.ValueAtQuantile(99), + histo.Max(), + histo.Mean(), + histo.TotalCount(), + errorRate) +} + +func (h *HistogramInt64) Export() exportedMetric { + histo := h.histo.Merge() + + return &histogramExport{ + h.name, + float64(histo.Min()), + float64(histo.ValueAtQuantile(50)), + float64(histo.ValueAtQuantile(95)), + float64(histo.ValueAtQuantile(99)), + float64(histo.Max()), + histo.Mean(), + histo.TotalCount(), + } +} + +func (h *HistogramInt64) Rotate() { h.histo.Rotate() } diff --git a/instrumentation/metric/histogram_prometheus_test.go b/instrumentation/metric/histogram_prometheus_test.go index 5585cc55b..5b1880f37 100644 --- a/instrumentation/metric/histogram_prometheus_test.go +++ b/instrumentation/metric/histogram_prometheus_test.go @@ -14,13 +14,13 @@ import ( "time" ) -// This does NOT test correctness of Histogram +// This does NOT test correctness of NewHistogramInt64 // (e.g. that calculation of quantiles for given values is correct) // It only verifies the accurate conversion of metric values into Prometheus format. func Test_PrometheusFormatterForHistogramWithLabels(t *testing.T) { r := NewRegistry().WithVirtualChainId(100000) const SEC = int64(time.Second) - histo := r.NewHistogram("Some.Latency", 1000*SEC) + histo := r.NewHistogramInt64("Some.Latency", 1000*SEC) for i := 0; i < 1000; i++ { histo.Record(int64(i) * SEC) diff --git a/instrumentation/metric/registry.go b/instrumentation/metric/registry.go index 90f96b888..b3294faf5 100644 --- a/instrumentation/metric/registry.go +++ b/instrumentation/metric/registry.go @@ -23,9 +23,14 @@ import ( const ROTATE_INTERVAL = 30 * time.Second const AGGREGATION_SPAN = 10 * time.Minute +type rotator interface { + Rotate() +} + type Factory interface { - NewHistogram(name string, maxValue int64) *Histogram - NewLatency(name string, maxDuration time.Duration) *Histogram + NewHistogramTimeDiff(name string, maxValue int64) *HistogramTimeDiff + NewHistogramInt64(name string, maxValue int64) *HistogramInt64 + NewLatency(name string, maxDuration time.Duration) *HistogramTimeDiff NewGauge(name string) *Gauge NewRate(name string) *Rate NewText(name string, defaultValue ...string) *Text @@ -123,14 +128,20 @@ func (r *inMemoryRegistry) NewGauge(name string) *Gauge { return g } -func (r *inMemoryRegistry) NewLatency(name string, maxDuration time.Duration) *Histogram { - h := newHistogram(name, maxDuration.Nanoseconds(), int(AGGREGATION_SPAN/ROTATE_INTERVAL)) +func (r *inMemoryRegistry) NewLatency(name string, maxDuration time.Duration) *HistogramTimeDiff { + h := newHistogramTimeDiff(name, maxDuration.Nanoseconds(), int(AGGREGATION_SPAN/ROTATE_INTERVAL)) + r.register(h) + return h +} + +func (r *inMemoryRegistry) NewHistogramTimeDiff(name string, maxValue int64) *HistogramTimeDiff { + h := newHistogramTimeDiff(name, maxValue, int(AGGREGATION_SPAN/ROTATE_INTERVAL)) r.register(h) return h } -func (r *inMemoryRegistry) NewHistogram(name string, maxValue int64) *Histogram { - h := newHistogram(name, maxValue, int(AGGREGATION_SPAN/ROTATE_INTERVAL)) +func (r *inMemoryRegistry) NewHistogramInt64(name string, maxValue int64) *HistogramInt64 { + h := newHistogramInt64(name, maxValue, int(AGGREGATION_SPAN/ROTATE_INTERVAL)) r.register(h) return h } @@ -170,9 +181,8 @@ func (r *inMemoryRegistry) PeriodicallyRotate(ctx context.Context, logger log.Lo r.mu.Lock() defer r.mu.Unlock() for _, m := range r.mu.metrics { - switch m.(type) { - case *Histogram: // only Histograms currently require rotating - m.(*Histogram).Rotate() + if rotator, ok := m.(rotator); ok { // TODO check how much faster it is to check for concrete types + rotator.Rotate() } } }, nil) diff --git a/services/blockstorage/adapter/filesystem/file_system_persistence.go b/services/blockstorage/adapter/filesystem/file_system_persistence.go index ff3a41072..aa2dcc9e0 100644 --- a/services/blockstorage/adapter/filesystem/file_system_persistence.go +++ b/services/blockstorage/adapter/filesystem/file_system_persistence.go @@ -28,14 +28,26 @@ import ( ) type metrics struct { - sizeOnDisk *metric.Gauge + sizeOnDisk *metric.Gauge + readBlockSize *metric.HistogramInt64 + writeBlockSize *metric.HistogramInt64 + readRate *metric.Rate + writeRate *metric.Rate + readMaxBlockSize *metric.Gauge + writeMaxBlockSize *metric.Gauge } const blocksFilename = "blocks" func newMetrics(m metric.Factory) *metrics { return &metrics{ - sizeOnDisk: m.NewGauge("BlockStorage.FileSystemSize.Bytes"), + sizeOnDisk: m.NewGauge("BlockStorage.FileSystemSize.Bytes"), + readBlockSize: m.NewHistogramInt64("BlockStorage.Block.Read.Size", 10*1024*1024), // rolling window analysis of block sizes up to 10MB + writeBlockSize: m.NewHistogramInt64("BlockStorage.Block.Write.Size", 10*1024*1024), // rolling window analysis of block sizes up to 10MB + readRate: m.NewRate("BlockStorage.Block.Read.Rate"), + writeRate: m.NewRate("BlockStorage.Block.Write.Rate"), + readMaxBlockSize: m.NewGauge("BlockStorage.Block.Read.Max.Bytes"), + writeMaxBlockSize: m.NewGauge("BlockStorage.Block.Write.Max.Bytes"), } } @@ -227,7 +239,7 @@ func newFileBlockWriter(file *os.File, codec blockCodec, nextBlockOffset int64) func buildIndex(r io.Reader, firstBlockOffset int64, logger log.Logger, c blockCodec) (*blockHeightIndex, error) { bhIndex := newBlockHeightIndex(logger, firstBlockOffset) - offset := int64(firstBlockOffset) + offset := firstBlockOffset for { aBlock, blockSize, err := c.decode(r) if err != nil { @@ -269,6 +281,10 @@ func (f *BlockPersistence) WriteNextBlock(blockPair *protocol.BlockPairContainer } f.metrics.sizeOnDisk.Add(int64(n)) + f.metrics.writeBlockSize.Record(int64(n)) + f.metrics.writeMaxBlockSize.UpdateMax(int64(n)) + f.metrics.writeRate.Measure(1) + return true, f.bhIndex.getLastBlockHeight(), nil } @@ -295,6 +311,7 @@ func (f *BlockPersistence) ScanBlocks(from primitives.BlockHeight, pageSize uint } page := make([]*protocol.BlockPairContainer, 0, pageSize) // TODO: Gad allow update of sequence height inside page + // TODO extract this loop to a fetchBlocksFromFile method that will not seek every time for height := fromHeight; height <= toHeight; height++ { aBlock, err := f.fetchBlockFromFile(height, file) if err != nil { @@ -319,14 +336,22 @@ func (f *BlockPersistence) ScanBlocks(from primitives.BlockHeight, pageSize uint func (f *BlockPersistence) fetchBlockFromFile(height primitives.BlockHeight, file *os.File) (*protocol.BlockPairContainer, error) { initialOffset, ok := f.bhIndex.fetchBlockOffset(height) if !ok { - return nil, fmt.Errorf("failed to find requested block %d", uint64(height)) + return nil, fmt.Errorf("failed to find requested block %d", height) } newOffset, err := file.Seek(initialOffset, io.SeekStart) if newOffset != initialOffset || err != nil { return nil, errors.Wrapf(err, "failed to seek in blocks file to position %v", initialOffset) } - aBlock, _, err := f.codec.decode(file) - return aBlock, err + + aBlock, bytes, err := f.codec.decode(file) + if err != nil { + return nil, errors.Wrapf(err, "failed to decode block %d at position %v", height, initialOffset) + } + + f.metrics.readBlockSize.Record(int64(bytes)) + f.metrics.readRate.Measure(1) + f.metrics.readMaxBlockSize.UpdateMax(int64(bytes)) + return aBlock, nil } func (f *BlockPersistence) GetLastBlockHeight() (primitives.BlockHeight, error) { diff --git a/services/blockstorage/internodesync/factory.go b/services/blockstorage/internodesync/factory.go index b3a7b81f8..b04b60511 100644 --- a/services/blockstorage/internodesync/factory.go +++ b/services/blockstorage/internodesync/factory.go @@ -61,12 +61,12 @@ func NewStateFactoryWithTimers( ) *stateFactory { f := &stateFactory{ - config: config, - gossip: gossip, - storage: storage, - conduit: conduit, - logger: logger, - metrics: newStateMetrics(factory), + config: config, + gossip: gossip, + storage: storage, + conduit: conduit, + logger: logger, + metrics: newStateMetrics(factory), } if createCollectTimeoutTimer == nil { @@ -98,7 +98,6 @@ func (f *stateFactory) getSyncBlocksOrder() gossipmessages.SyncBlocksOrder { } } - func (f *stateFactory) defaultCreateCollectTimeoutTimer() *synchronization.Timer { return synchronization.NewTimer(f.config.BlockSyncCollectResponseTimeout()) } @@ -123,12 +122,12 @@ func (f *stateFactory) CreateIdleState() syncState { func (f *stateFactory) CreateCollectingAvailabilityResponseState() syncState { return &collectingAvailabilityResponsesState{ - factory: f, - client: newBlockSyncGossipClient(f.gossip, f.storage, f.logger, f.config.BlockSyncNumBlocksInBatch, f.config.NodeAddress), - createTimer: f.createCollectTimeoutTimer, - logger: f.logger, - conduit: f.conduit, - metrics: f.metrics.collectingStateMetrics, + factory: f, + client: newBlockSyncGossipClient(f.gossip, f.storage, f.logger, f.config.BlockSyncNumBlocksInBatch, f.config.NodeAddress), + createTimer: f.createCollectTimeoutTimer, + logger: f.logger, + conduit: f.conduit, + metrics: f.metrics.collectingStateMetrics, } } @@ -155,12 +154,12 @@ func (f *stateFactory) CreateWaitingForChunksState(sourceNodeAddress primitives. func (f *stateFactory) CreateProcessingBlocksState(message *gossipmessages.BlockSyncResponseMessage) syncState { return &processingBlocksState{ - blocks: message, - factory: f, - logger: f.logger, - storage: f.storage, - conduit: f.conduit, - metrics: f.metrics.processingStateMetrics, + blocks: message, + factory: f, + logger: f.logger, + storage: f.storage, + conduit: f.conduit, + metrics: f.metrics.processingStateMetrics, } } @@ -173,32 +172,32 @@ type stateMetrics struct { } type idleStateMetrics struct { - timeSpentInState *metric.Histogram + timeSpentInState *metric.HistogramTimeDiff timesReset *metric.Gauge timesExpired *metric.Gauge } type collectingStateMetrics struct { - timeSpentInState *metric.Histogram + timeSpentInState *metric.HistogramTimeDiff timesSucceededSendingAvailabilityRequest *metric.Gauge timesFailedSendingAvailabilityRequest *metric.Gauge } type finishedCollectingStateMetrics struct { - timeSpentInState *metric.Histogram + timeSpentInState *metric.HistogramTimeDiff finishedWithNoResponsesCount *metric.Gauge finishedWithSomeResponsesCount *metric.Gauge } type waitingStateMetrics struct { - timeSpentInState *metric.Histogram + timeSpentInState *metric.HistogramTimeDiff timesTimeout *metric.Gauge timesSuccessful *metric.Gauge timesByzantine *metric.Gauge } type processingStateMetrics struct { - timeSpentInState *metric.Histogram + timeSpentInState *metric.HistogramTimeDiff blocksRate *metric.Rate committedBlocks *metric.Gauge failedCommitBlocks *metric.Gauge diff --git a/services/consensusalgo/benchmarkconsensus/service.go b/services/consensusalgo/benchmarkconsensus/service.go index 6559861d1..8b300973b 100644 --- a/services/consensusalgo/benchmarkconsensus/service.go +++ b/services/consensusalgo/benchmarkconsensus/service.go @@ -60,10 +60,10 @@ type Service struct { } type metrics struct { - consensusRoundTickTime *metric.Histogram + consensusRoundTickTime *metric.HistogramTimeDiff failedConsensusTicksRate *metric.Rate timedOutConsensusTicksRate *metric.Rate - votingTime *metric.Histogram + votingTime *metric.HistogramTimeDiff lastCommittedTime *metric.Gauge } diff --git a/services/consensusalgo/leanhelixconsensus/service.go b/services/consensusalgo/leanhelixconsensus/service.go index 8967b9293..e6e635f32 100644 --- a/services/consensusalgo/leanhelixconsensus/service.go +++ b/services/consensusalgo/leanhelixconsensus/service.go @@ -46,8 +46,8 @@ type Service struct { } type metrics struct { - timeSinceLastCommitMillis *metric.Histogram - timeSinceLastElectionMillis *metric.Histogram + timeSinceLastCommitMillis *metric.HistogramTimeDiff + timeSinceLastElectionMillis *metric.HistogramTimeDiff currentLeaderMemberId *metric.Text currentElectionCount *metric.Gauge lastCommittedTime *metric.Gauge diff --git a/services/consensusalgo/leanhelixconsensus/test/harness.go b/services/consensusalgo/leanhelixconsensus/test/harness.go index c4facd740..1527d0819 100644 --- a/services/consensusalgo/leanhelixconsensus/test/harness.go +++ b/services/consensusalgo/leanhelixconsensus/test/harness.go @@ -50,8 +50,8 @@ type singleLhcNodeHarness struct { } type metrics struct { - timeSinceLastCommitMillis *metric.Histogram - timeSinceLastElectionMillis *metric.Histogram + timeSinceLastCommitMillis *metric.HistogramTimeDiff + timeSinceLastElectionMillis *metric.HistogramTimeDiff currentLeaderMemberId *metric.Text currentElectionCount *metric.Gauge lastCommittedTime *metric.Gauge @@ -108,8 +108,8 @@ func (h *singleLhcNodeHarness) start(parent *with.ConcurrencyHarness, ctx contex // NOTICE in the test harness these values always exist so no checking of nil return values func (h *singleLhcNodeHarness) getMetrics() *metrics { return &metrics{ - timeSinceLastCommitMillis: h.metricRegistry.Get("ConsensusAlgo.LeanHelix.TimeSinceLastCommit.Millis").(*metric.Histogram), - timeSinceLastElectionMillis: h.metricRegistry.Get("ConsensusAlgo.LeanHelix.TimeSinceLastElection.Millis").(*metric.Histogram), + timeSinceLastCommitMillis: h.metricRegistry.Get("ConsensusAlgo.LeanHelix.TimeSinceLastCommit.Millis").(*metric.HistogramTimeDiff), + timeSinceLastElectionMillis: h.metricRegistry.Get("ConsensusAlgo.LeanHelix.TimeSinceLastElection.Millis").(*metric.HistogramTimeDiff), currentElectionCount: h.metricRegistry.Get("ConsensusAlgo.LeanHelix.CurrentElection.Number").(*metric.Gauge), currentLeaderMemberId: h.metricRegistry.Get("ConsensusAlgo.LeanHelix.CurrentLeaderMemberId.Number").(*metric.Text), lastCommittedTime: h.metricRegistry.Get("ConsensusAlgo.LeanHelix.LastCommitted.TimeNano").(*metric.Gauge), @@ -146,7 +146,7 @@ func (h *singleLhcNodeHarness) beFirstInCommittee() { func (h *singleLhcNodeHarness) expectConsensusContextRequestOrderingCommittee(leaderNodeIndex int) { h.consensusContext.When("RequestOrderingCommittee", mock.Any, mock.Any).Return(&services.RequestCommitteeOutput{ NodeAddresses: h.getCommitteeWithNodeIndexAsLeader(leaderNodeIndex), - Weights: h.getCommitteeWeights(), + Weights: h.getCommitteeWeights(), }, nil).Times(1) } diff --git a/services/consensuscontext/service.go b/services/consensuscontext/service.go index 12ec7177f..797f6fb2c 100644 --- a/services/consensuscontext/service.go +++ b/services/consensuscontext/service.go @@ -20,9 +20,9 @@ import ( var LogTag = log.Service("consensus-context") type metrics struct { - createTxBlockTime *metric.Histogram - createResultsBlockTime *metric.Histogram - processTransactionsSeInCreateResultsBlock *metric.Histogram + createTxBlockTime *metric.HistogramTimeDiff + createResultsBlockTime *metric.HistogramTimeDiff + processTransactionsSeInCreateResultsBlock *metric.HistogramTimeDiff transactionsRate *metric.Rate committeeSize *metric.Gauge committeeMembers *metric.Text diff --git a/services/crosschainconnector/ethereum/timestampfinder/timestamp_finder.go b/services/crosschainconnector/ethereum/timestampfinder/timestamp_finder.go index 443c13e08..2718361eb 100644 --- a/services/crosschainconnector/ethereum/timestampfinder/timestamp_finder.go +++ b/services/crosschainconnector/ethereum/timestampfinder/timestamp_finder.go @@ -37,7 +37,7 @@ type finder struct { } type timestampBlockFinderMetrics struct { - timeToFindBlock *metric.Histogram + timeToFindBlock *metric.HistogramTimeDiff stepsRequired *metric.Rate totalTimesCalled *metric.Gauge cacheHits *metric.Gauge diff --git a/services/gossip/adapter/tcp/outgoing_connections.go b/services/gossip/adapter/tcp/outgoing_connections.go index 11b707430..b061d6a83 100644 --- a/services/gossip/adapter/tcp/outgoing_connections.go +++ b/services/gossip/adapter/tcp/outgoing_connections.go @@ -18,7 +18,7 @@ type outgoingConnectionMetrics struct { sendQueueErrors *metric.Gauge activeCount *metric.Gauge - messageSize *metric.Histogram + messageSize *metric.HistogramInt64 } type outgoingConnections struct { @@ -52,7 +52,7 @@ func createOutgoingConnectionMetrics(registry metric.Registry) *outgoingConnecti KeepaliveErrors: registry.NewGauge("Gossip.OutgoingConnection.KeepaliveErrors.Count"), sendQueueErrors: registry.NewGauge("Gossip.OutgoingConnection.SendQueueErrors.Count"), activeCount: registry.NewGauge("Gossip.OutgoingConnection.Active.Count"), - messageSize: registry.NewHistogram("Gossip.OutgoingConnection.MessageSize.Bytes", MAX_PAYLOAD_SIZE_BYTES), + messageSize: registry.NewHistogramInt64("Gossip.OutgoingConnection.MessageSize.Bytes", MAX_PAYLOAD_SIZE_BYTES), } } diff --git a/services/processor/native/adapter/native_compiler.go b/services/processor/native/adapter/native_compiler.go index 8627f82dc..acaf72c7d 100644 --- a/services/processor/native/adapter/native_compiler.go +++ b/services/processor/native/adapter/native_compiler.go @@ -37,11 +37,11 @@ var LogTag = log.String("adapter", "processor-native") type nativeCompilerMetrics struct { lastWarmUpTimeMs *metric.Gauge - totalCompileTime *metric.Histogram - writeToDiskTime *metric.Histogram - buildTime *metric.Histogram - loadTime *metric.Histogram - sourceSize *metric.Histogram + totalCompileTime *metric.HistogramTimeDiff + writeToDiskTime *metric.HistogramTimeDiff + buildTime *metric.HistogramTimeDiff + loadTime *metric.HistogramTimeDiff + sourceSize *metric.HistogramInt64 } type nativeCompiler struct { @@ -57,7 +57,7 @@ func createNativeCompilerMetrics(factory metric.Factory) *nativeCompilerMetrics loadTime: factory.NewLatency("Processor.Native.Compiler.LoadObject.Time.Millis", 60*time.Minute), lastWarmUpTimeMs: factory.NewGauge("Processor.Native.Compiler.LastWarmUp.Time.Millis"), writeToDiskTime: factory.NewLatency("Processor.Native.Compiler.WriteToDisk.Time.Millis", 60*time.Minute), - sourceSize: factory.NewHistogram("Processor.Native.Compiler.Source.Size.Bytes", 1024*1024), // megabyte + sourceSize: factory.NewHistogramInt64("Processor.Native.Compiler.Source.Size.Bytes", 1024*1024), // megabyte } } diff --git a/services/processor/native/compiling_repository.go b/services/processor/native/compiling_repository.go index 0d7a4eb2b..5de4ed476 100644 --- a/services/processor/native/compiling_repository.go +++ b/services/processor/native/compiling_repository.go @@ -61,8 +61,8 @@ type CompilingRepository struct { sanitizer *sanitizer.Sanitizer deployedContracts *metric.Gauge - processCallTime *metric.Histogram - contractCompilationTime *metric.Histogram + processCallTime *metric.HistogramTimeDiff + contractCompilationTime *metric.HistogramTimeDiff config config.NativeProcessorConfig } diff --git a/services/processor/native/service.go b/services/processor/native/service.go index dcc328218..fc50ab60c 100644 --- a/services/processor/native/service.go +++ b/services/processor/native/service.go @@ -47,7 +47,7 @@ type service struct { } type metrics struct { - processCallTime *metric.Histogram + processCallTime *metric.HistogramTimeDiff } func getMetrics(m metric.Factory) *metrics { diff --git a/services/publicapi/service.go b/services/publicapi/service.go index 8419fb6d0..5b3d0b61a 100644 --- a/services/publicapi/service.go +++ b/services/publicapi/service.go @@ -34,9 +34,9 @@ type service struct { } type metrics struct { - sendTransactionTime *metric.Histogram - getTransactionStatusTime *metric.Histogram - runQueryTime *metric.Histogram + sendTransactionTime *metric.HistogramTimeDiff + getTransactionStatusTime *metric.HistogramTimeDiff + runQueryTime *metric.HistogramTimeDiff totalTransactionsFromClients *metric.Gauge totalTransactionsErrNilRequest *metric.Gauge totalTransactionsErrInvalidRequest *metric.Gauge diff --git a/services/transactionpool/pending_pool.go b/services/transactionpool/pending_pool.go index 9ad09a7ac..de5e392a4 100644 --- a/services/transactionpool/pending_pool.go +++ b/services/transactionpool/pending_pool.go @@ -42,8 +42,8 @@ type pendingPoolMetrics struct { transactionCountGauge *metric.Gauge poolSizeInBytesGauge *metric.Gauge transactionRatePerSecond *metric.Rate - transactionSpentInQueue *metric.Histogram - transactionServiceTime *metric.Histogram + transactionSpentInQueue *metric.HistogramTimeDiff + transactionServiceTime *metric.HistogramTimeDiff } func newPendingPoolMetrics(factory metric.Factory) *pendingPoolMetrics { From 0dff24ea168843899c76ebfb931b74a6286f6089 Mon Sep 17 00:00:00 2001 From: Ron Date: Thu, 16 Jul 2020 15:44:47 +0300 Subject: [PATCH 2/3] remove unused packages --- go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.mod b/go.mod index e6eff15b4..d0e5f4075 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,6 @@ require ( github.com/c9s/goprocinfo v0.0.0-20190309065803-0b2ad9ac246b github.com/cespare/cp v1.1.1 // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd - github.com/coreos/etcd v3.3.13+incompatible - github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f github.com/deckarep/golang-set v1.7.1 // indirect github.com/edsrzf/mmap-go v1.0.0 // indirect github.com/elastic/gosigar v0.10.4 // indirect From ec3744ed997faf143b6c16cfd9656b1de4d25d6b Mon Sep 17 00:00:00 2001 From: Ron Date: Sun, 19 Jul 2020 15:13:59 +0300 Subject: [PATCH 3/3] make max update async and stop trying after 1000 failures --- instrumentation/metric/gauge.go | 21 +++++++++++-------- .../filesystem/file_system_persistence.go | 4 ++-- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/instrumentation/metric/gauge.go b/instrumentation/metric/gauge.go index 822d99cd8..7175f8f34 100644 --- a/instrumentation/metric/gauge.go +++ b/instrumentation/metric/gauge.go @@ -58,16 +58,19 @@ func (g *Gauge) Update(i int64) { atomic.StoreInt64(&g.value, i) } -func (g *Gauge) UpdateMax(i int64) { - for { // retry indefinitely (?!) if optimistic lock fails - old := g.Value() - if old >= i { - return +func (g *Gauge) UpdateMaxAsync(i int64) { + go func() { + const RetryCount = 1000 + for r := 0; r < RetryCount; r++ { // try until optimistic lock succeeds + old := g.Value() + if old >= i { // check max outside of lock + return + } + if atomic.CompareAndSwapInt64(&g.value, old, i) { // optimistic lock + return + } } - if atomic.CompareAndSwapInt64(&g.value, old, i) { // optimistic lock - return - } - } + }() } func (g *Gauge) UpdateUInt32(i int32) { diff --git a/services/blockstorage/adapter/filesystem/file_system_persistence.go b/services/blockstorage/adapter/filesystem/file_system_persistence.go index aa2dcc9e0..22b6d8795 100644 --- a/services/blockstorage/adapter/filesystem/file_system_persistence.go +++ b/services/blockstorage/adapter/filesystem/file_system_persistence.go @@ -282,7 +282,7 @@ func (f *BlockPersistence) WriteNextBlock(blockPair *protocol.BlockPairContainer f.metrics.sizeOnDisk.Add(int64(n)) f.metrics.writeBlockSize.Record(int64(n)) - f.metrics.writeMaxBlockSize.UpdateMax(int64(n)) + f.metrics.writeMaxBlockSize.UpdateMaxAsync(int64(n)) f.metrics.writeRate.Measure(1) return true, f.bhIndex.getLastBlockHeight(), nil @@ -350,7 +350,7 @@ func (f *BlockPersistence) fetchBlockFromFile(height primitives.BlockHeight, fil f.metrics.readBlockSize.Record(int64(bytes)) f.metrics.readRate.Measure(1) - f.metrics.readMaxBlockSize.UpdateMax(int64(bytes)) + f.metrics.readMaxBlockSize.UpdateMaxAsync(int64(bytes)) return aBlock, nil }