Skip to content

Commit

Permalink
Merge pull request #116 from ethpandaops/pk910/fix-eventstream-discon…
Browse files Browse the repository at this point in the history
…nects

workaround for eventstream disconnects
  • Loading branch information
pk910 authored Aug 26, 2024
2 parents 65baf7d + b680cf4 commit f51115d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
9 changes: 8 additions & 1 deletion clients/consensus/rpc/beaconstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,14 @@ func (bs *BeaconStream) startStream() {
Ready: true,
}
case err := <-stream.Errors:
bs.logger.Warnf("beacon block stream error: %v", err)
if strings.Contains(err.Error(), "INTERNAL_ERROR; received from peer") {
// this seems to be a go bug, silently reconnect to the stream
time.Sleep(10 * time.Millisecond)
stream.RetryNow()
} else {
bs.logger.Warnf("beacon block stream error: %v", err)
}

select {
case bs.ReadyChan <- &BeaconStreamStatus{
Ready: false,
Expand Down
16 changes: 15 additions & 1 deletion clients/consensus/rpc/eventstream/eventstream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eventstream

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -34,6 +35,8 @@ type Stream struct {
isClosed bool
// isClosedMutex is a mutex protecting concurrent read/write access of isClosed
closeMutex sync.Mutex
// retrySleepCancel is a function that can be called to cancel the retry sleep
retrySleepCancel context.CancelFunc
}

type StreamEvent interface {
Expand Down Expand Up @@ -114,6 +117,13 @@ func (stream *Stream) Close() {
}()
}

// RetryNow will force the stream to reconnect a disconnected stream immediately.
func (stream *Stream) RetryNow() {
if cancelFn := stream.retrySleepCancel; cancelFn != nil {
cancelFn()
}
}

// Go's http package doesn't copy headers across when it encounters
// redirects so we need to do that manually.
func checkRedirect(req *http.Request, via []*http.Request) error {
Expand Down Expand Up @@ -215,7 +225,11 @@ func (stream *Stream) retryRestartStream() {
stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds())
}

time.Sleep(backoff)
ctx, cancel := context.WithTimeout(context.Background(), backoff)
stream.retrySleepCancel = cancel
<-ctx.Done()

stream.retrySleepCancel = nil

if stream.isClosed {
return
Expand Down

0 comments on commit f51115d

Please sign in to comment.