Skip to content

Commit

Permalink
Closing a single connection instead of discarding whole connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
amsrnjn committed Aug 1, 2023
1 parent 264dfde commit b128174
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 90 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
Release v1.2.13 (2023-07-13)
===
* On errors close the single connection instead of destroying the whole pool
* Increase number of retries to 3 for health checks
* Fix issue where we end up sending error to a closed channel

Release v1.2.12 (2023-01-10)
===
* Add support for healthcheck infrastructure
Expand Down
2 changes: 1 addition & 1 deletion dax/internal/client/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func (c *cluster) update(config []serviceEndpoint) error {
for ep, clicfg := range oldActive {
_, isPartOfUpdatedEndpointsConfig := newEndpoints[ep]
if !isPartOfUpdatedEndpointsConfig {
c.debugLog(fmt.Sprintf("Found updated endpoing configs, will close inactive endpoint client : %s", ep.host))
c.debugLog(fmt.Sprintf("Found updated endpoint configs, will close inactive endpoint client : %s", ep.host))
toClose = append(toClose, clicfg)
}
}
Expand Down
31 changes: 21 additions & 10 deletions dax/internal/client/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (client *SingleDaxClient) startHealthChecks(cc *cluster, host hostPort) {
ctx, cfn := context.WithTimeout(aws.BackgroundContext(), 1*time.Second)
defer cfn()
var err error
_, err = client.endpoints(RequestOptions{MaxRetries: 2, Context: ctx})
_, err = client.endpoints(RequestOptions{MaxRetries: 3, Context: ctx})
if err != nil {
cc.debugLog(fmt.Sprintf("Health checks failed with error " + err.Error() + " for host :: " + host.host))
cc.onHealthCheckFailed(host)
Expand Down Expand Up @@ -740,31 +740,41 @@ func (client *SingleDaxClient) executeWithContext(ctx aws.Context, op string, en
return err
}
if err = client.pool.setDeadline(ctx, t); err != nil {
client.pool.discard(t)
// If the error is just due to context cancelled or timeout
// then the tube is still usable because we have not written anything to tube
if err == ctx.Err() {
client.pool.put(t)
return err
}
// If we get error while setting deadline of tube
// probably something is wrong with the tube
client.pool.closeTube(t)
return err
}

if err = client.auth(t); err != nil {
client.pool.discard(t)
// Auth method writes in the tube and
// it is not guaranteed that it will be drained completely on error
client.pool.closeTube(t)
return err
}

writer := t.CborWriter()
if err = encoder(writer); err != nil {
// Validation errors will cause pool to be discarded as there is no guarantee
// Validation errors will cause connection to be closed as there is no guarantee
// that the validation was performed before any data was written into tube
client.pool.discard(t)
client.pool.closeTube(t)
return err
}
if err := writer.Flush(); err != nil {
client.pool.discard(t)
client.pool.closeTube(t)
return err
}

reader := t.CborReader()
ex, err := decodeError(reader)
if err != nil { // decode or network error
client.pool.discard(t)
if err != nil { // decode or network error - doesn't guarantee completely drained tube
client.pool.closeTube(t)
return err
}
if ex != nil { // user or server error
Expand All @@ -774,7 +784,8 @@ func (client *SingleDaxClient) executeWithContext(ctx aws.Context, op string, en

err = decoder(reader)
if err != nil {
client.pool.discard(t)
// we are not able to completely drain tube
client.pool.closeTube(t)
} else {
client.pool.put(t)
}
Expand Down Expand Up @@ -809,7 +820,7 @@ func (client *SingleDaxClient) recycleTube(t tube, err error) {
if recycle {
client.pool.put(t)
} else {
client.pool.discard(t)
client.pool.closeTube(t)
}
}
func (client *SingleDaxClient) auth(t tube) error {
Expand Down
2 changes: 1 addition & 1 deletion dax/internal/client/tube.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

const magic = "J7yne5G"
const agent = "DaxGoClient-1.2.12"
const agent = "DaxGoClient-1.2.13"

var optional = map[string]string{"UserAgent": agent}

Expand Down
38 changes: 7 additions & 31 deletions dax/internal/client/tubepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,13 @@ func (p *tubePool) allocAndReleaseGate(session int64, done chan tube, releaseGat
}
} else {
p.mutex.Lock()
cls := p.closed
p.mutex.Unlock()
if !cls {
if !p.closed {
select {
case p.errCh <- err:
default:
}
}
p.mutex.Unlock()
}
if done != nil {
close(done)
Expand Down Expand Up @@ -236,9 +235,10 @@ func (p *tubePool) put(t tube) {
p.top = t
}

// Closes the specified tube, and if the tube is using the same version as the current session,
// then also closes all other idle tubes and performs a version bump.
func (p *tubePool) discard(t tube) {
// Make sure to closeTube the tube if you are not sure that the tube is clean
// Clean tube means nothing is written inside the tube or
// the things written inside tube is drained completely
func (p *tubePool) closeTube(t tube) {
if t == nil {
return
}
Expand All @@ -249,30 +249,6 @@ func (p *tubePool) discard(t tube) {
t.Close()
}()
}

p.mutex.Lock()

var head tube
if t.Session() == p.session {
p.sessionBump()
head = p.clearIdleConnections()
}

// Waiters enter the waiting queue when there's no existing tube
// or when they failed to acquire a permit to create a new tube.
// There's also a chance the newly created tube was stolen and
// the thief must return it back into the pool or discard it.
if p.waiters != nil {
select {
case p.waiters <- nil: // wake up a single waiter, if any
break
default:
close(p.waiters) // or unblock all future waiters who are yet to enter the waiters queue
p.waiters = nil
}
}
p.mutex.Unlock()
p.closeAll(head)
}

// Sets the deadline on the underlying net.Conn object
Expand Down Expand Up @@ -303,7 +279,7 @@ func (p *tubePool) Close() error {
p.waiters = nil
}
close(p.errCh)
// cannot close(p.gate) as send on closed channel will panic. new connections will be closed immediately.
// cannot closeTube(p.gate) as send on closed channel will panic. new connections will be closed immediately.
}
p.mutex.Unlock()
p.closeAll(head)
Expand Down
52 changes: 5 additions & 47 deletions dax/internal/client/tubepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,58 +509,16 @@ func countTubes(pool *tubePool) int {
return count
}

func TestTubePool_DiscardBumpsSession(t *testing.T) {
func TestTubePool_close(t *testing.T) {
p := newTubePoolWithOptions(":1234", tubePoolOptions{1, 5 * time.Second, defaultDialer.DialContext}, connConfigData)
origSession := p.session
p.closeTubeImmediately = true

tt := &mockTube{}
tt.On("Session").Return(p.session).Once()
tt.On("Close").Return(nil).Once()
p.discard(tt)

require.NotEqual(t, origSession, p.session)
}

func TestTubePool_DiscardWakesUpWaiters(t *testing.T) {

p := newTubePoolWithOptions(":1234", tubePoolOptions{1, 5 * time.Second, defaultDialer.DialContext}, connConfigData)
p.dialContext = func(ctx context.Context, a, n string) (net.Conn, error) {
return &mockConn{}, nil
}
// artificially enter the gate to prevent new connections
entered := p.gate.tryEnter()
require.True(t, entered)

var startedWg sync.WaitGroup
startedWg.Add(1)

ch := make(chan struct {
tube
error
})
go func() {
startedWg.Done()
t, err := p.get()
ch <- struct {
tube
error
}{t, err}
}()
startedWg.Wait()
// wait some extra time to make sure the caller has entered waiters queue
time.Sleep(2 * time.Second)

// release the gate to allow woken waiters to establish a new connection
p.gate.exit()
tt := &mockTube{}
tt.On("Session").Return(p.session).Once()
tt.On("Close").Return(nil).Once()

p.discard(tt)

result := <-ch
require.NoError(t, result.error)
require.NotNil(t, result.tube)
p.closeTube(tt)
require.Equal(t, origSession, p.session)
tt.AssertCalled(t, "Close")
}

func TestTubePool_PutClosesTubesIfPoolIsClosed(t *testing.T) {
Expand Down

0 comments on commit b128174

Please sign in to comment.