From b680cf42355dd501704266aa8d6b6816a32b46ca Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 25 Aug 2024 13:56:33 +0200 Subject: [PATCH] workaround for eventstream disconnects --- clients/consensus/rpc/beaconstream.go | 9 ++++++++- clients/consensus/rpc/eventstream/eventstream.go | 16 +++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/clients/consensus/rpc/beaconstream.go b/clients/consensus/rpc/beaconstream.go index 855f3a2..be6fd92 100644 --- a/clients/consensus/rpc/beaconstream.go +++ b/clients/consensus/rpc/beaconstream.go @@ -93,7 +93,14 @@ func (bs *BeaconStream) startStream() { Ready: true, } case err := <-stream.Errors: - bs.logger.Warnf("beacon block stream error: %v", err) + if strings.Contains(err.Error(), "INTERNAL_ERROR; received from peer") { + // this seems to be a go bug, silently reconnect to the stream + time.Sleep(10 * time.Millisecond) + stream.RetryNow() + } else { + bs.logger.Warnf("beacon block stream error: %v", err) + } + select { case bs.ReadyChan <- &BeaconStreamStatus{ Ready: false, diff --git a/clients/consensus/rpc/eventstream/eventstream.go b/clients/consensus/rpc/eventstream/eventstream.go index 920e39e..3afb3f2 100644 --- a/clients/consensus/rpc/eventstream/eventstream.go +++ b/clients/consensus/rpc/eventstream/eventstream.go @@ -1,6 +1,7 @@ package eventstream import ( + "context" "errors" "fmt" "io" @@ -34,6 +35,8 @@ type Stream struct { isClosed bool // isClosedMutex is a mutex protecting concurrent read/write access of isClosed closeMutex sync.Mutex + // retrySleepCancel is a function that can be called to cancel the retry sleep + retrySleepCancel context.CancelFunc } type StreamEvent interface { @@ -114,6 +117,13 @@ func (stream *Stream) Close() { }() } +// RetryNow will force the stream to reconnect a disconnected stream immediately. +func (stream *Stream) RetryNow() { + if cancelFn := stream.retrySleepCancel; cancelFn != nil { + cancelFn() + } +} + // Go's http package doesn't copy headers across when it encounters // redirects so we need to do that manually. func checkRedirect(req *http.Request, via []*http.Request) error { @@ -215,7 +225,11 @@ func (stream *Stream) retryRestartStream() { stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds()) } - time.Sleep(backoff) + ctx, cancel := context.WithTimeout(context.Background(), backoff) + stream.retrySleepCancel = cancel + <-ctx.Done() + + stream.retrySleepCancel = nil if stream.isClosed { return