Skip to content

Commit fe1b184

Browse files
committed
Support timeout when fetching metadata
Fetching metadata from an unreachable cluster of 2 brokers can take more than 3 minutes using the default configuration (30s DialTimeout, 3 retries, 250ms backoff) before failing with ErrOutOfBrokers. Adding config.Metadata.Timeout configuration option to fail faster.
1 parent 8457f0b commit fe1b184

File tree

3 files changed

+102
-4
lines changed

3 files changed

+102
-4
lines changed

client.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,11 @@ func (client *client) RefreshMetadata(topics ...string) error {
435435
}
436436
}
437437

438-
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
438+
deadline := time.Time{}
439+
if client.conf.Metadata.Timeout > 0 {
440+
deadline = time.Now().Add(client.conf.Metadata.Timeout)
441+
}
442+
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
439443
}
440444

441445
func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
@@ -737,20 +741,32 @@ func (client *client) refreshMetadata() error {
737741
return nil
738742
}
739743

740-
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
744+
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
745+
pastDeadline := func(backoff time.Duration) bool {
746+
if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
747+
// we are past the deadline
748+
return true
749+
}
750+
return false
751+
}
741752
retry := func(err error) error {
742753
if attemptsRemaining > 0 {
743754
backoff := client.computeBackoff(attemptsRemaining)
755+
if pastDeadline(backoff) {
756+
Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
757+
return err
758+
}
744759
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
745760
if backoff > 0 {
746761
time.Sleep(backoff)
747762
}
748-
return client.tryRefreshMetadata(topics, attemptsRemaining-1)
763+
return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
749764
}
750765
return err
751766
}
752767

753-
for broker := client.any(); broker != nil; broker = client.any() {
768+
broker := client.any()
769+
for ; broker != nil && !pastDeadline(0); broker = client.any() {
754770
allowAutoTopicCreation := true
755771
if len(topics) > 0 {
756772
Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
@@ -788,6 +804,11 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
788804
}
789805
}
790806

807+
if broker != nil {
808+
Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
809+
return retry(ErrOutOfBrokers)
810+
}
811+
791812
Logger.Println("client/metadata no available broker to send metadata request to")
792813
client.resurrectDeadBrokers()
793814
return retry(ErrOutOfBrokers)

client_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package sarama
22

33
import (
4+
"fmt"
45
"io"
56
"sync"
67
"sync/atomic"
@@ -612,6 +613,75 @@ func TestClientController(t *testing.T) {
612613
}
613614
}
614615

616+
func TestClientMetadataTimeout(t *testing.T) {
617+
for _, timeout := range []time.Duration{
618+
250 * time.Millisecond, // Will cut the first retry pass
619+
500 * time.Millisecond, // Will cut the second retry pass
620+
750 * time.Millisecond, // Will cut the third retry pass
621+
900 * time.Millisecond, // Will stop after the three retries
622+
} {
623+
t.Run(fmt.Sprintf("timeout=%v", timeout), func(t *testing.T) {
624+
// Use a responsive broker to create a working client
625+
initialSeed := NewMockBroker(t, 0)
626+
emptyMetadata := new(MetadataResponse)
627+
initialSeed.Returns(emptyMetadata)
628+
629+
conf := NewConfig()
630+
// Speed up the metadata request failure because of a read timeout
631+
conf.Net.ReadTimeout = 100 * time.Millisecond
632+
// Disable backoff and refresh
633+
conf.Metadata.Retry.Backoff = 0
634+
conf.Metadata.RefreshFrequency = 0
635+
// But configure a "global" timeout
636+
conf.Metadata.Timeout = timeout
637+
c, err := NewClient([]string{initialSeed.Addr()}, conf)
638+
if err != nil {
639+
t.Fatal(err)
640+
}
641+
initialSeed.Close()
642+
643+
client := c.(*client)
644+
645+
// Start seed brokers that do not reply to anything and therefore a read
646+
// on the TCP connection will timeout to simulate unresponsive brokers
647+
seed1 := NewMockBroker(t, 1)
648+
defer seed1.Close()
649+
seed2 := NewMockBroker(t, 2)
650+
defer seed2.Close()
651+
652+
// Overwrite the seed brokers with a fixed ordering to make this test deterministic
653+
safeClose(t, client.seedBrokers[0])
654+
client.seedBrokers = []*Broker{NewBroker(seed1.Addr()), NewBroker(seed2.Addr())}
655+
client.deadSeeds = []*Broker{}
656+
657+
// Start refreshing metadata in the background
658+
errChan := make(chan error)
659+
start := time.Now()
660+
go func() {
661+
errChan <- c.RefreshMetadata()
662+
}()
663+
664+
// Check that the refresh fails fast enough (less than twice the configured timeout)
665+
// instead of at least: 100 ms * 2 brokers * 3 retries = 800 ms
666+
maxRefreshDuration := 2 * timeout
667+
select {
668+
case err := <-errChan:
669+
t.Logf("Got err: %v after waiting for: %v", err, time.Since(start))
670+
if err == nil {
671+
t.Fatal("Expected failed RefreshMetadata, got nil")
672+
}
673+
if err != ErrOutOfBrokers {
674+
t.Error("Expected failed RefreshMetadata with ErrOutOfBrokers, got:", err)
675+
}
676+
case <-time.After(maxRefreshDuration):
677+
t.Fatalf("RefreshMetadata did not fail fast enough after waiting for %v", maxRefreshDuration)
678+
}
679+
680+
safeClose(t, c)
681+
})
682+
}
683+
}
684+
615685
func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
616686
seedBroker := NewMockBroker(t, 1)
617687
staleCoordinator := NewMockBroker(t, 2)

config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,13 @@ type Config struct {
121121
// and usually more convenient, but can take up a substantial amount of
122122
// memory if you have many topics and partitions. Defaults to true.
123123
Full bool
124+
125+
// How long to wait for a successful metadata response.
126+
// Disabled by default which means a metadata request against an unreachable
127+
// cluster (all brokers are unreachable or unresponsive) can take up to
128+
// `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max`
129+
// to fail.
130+
Timeout time.Duration
124131
}
125132

126133
// Producer is the namespace for configuration related to producing messages,

0 commit comments

Comments
 (0)