Skip to content

Commit a09f00e

Browse files
authored
feat(storage): dynamic read request stall timeout (#10958)
- Adding a new storage client option to enable pro-actively retry for stalled request based on dynamic timeout. - Added emulator test to validated the changes.
1 parent 4c98f7a commit a09f00e

File tree

7 files changed

+408
-40
lines changed

7 files changed

+408
-40
lines changed

storage/client_test.go

Lines changed: 86 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
package storage
1616

1717
import (
18+
"bytes"
1819
"context"
1920
"errors"
2021
"fmt"
22+
"io"
2123
"log"
2224
"net/url"
2325
"os"
@@ -27,6 +29,7 @@ import (
2729
"time"
2830

2931
"cloud.google.com/go/iam/apiv1/iampb"
32+
"cloud.google.com/go/storage/experimental"
3033
"github.com/google/go-cmp/cmp"
3134
"github.com/googleapis/gax-go/v2"
3235
"github.com/googleapis/gax-go/v2/apierror"
@@ -948,7 +951,6 @@ func initEmulatorClients() func() error {
948951
log.Fatalf("Error setting up HTTP client for emulator tests: %v", err)
949952
return noopCloser
950953
}
951-
952954
emulatorClients = map[string]storageClient{
953955
"http": httpClient,
954956
"grpc": grpcClient,
@@ -1335,10 +1337,14 @@ func TestObjectConditionsEmulated(t *testing.T) {
13351337
// Test that RetryNever prevents any retries from happening in both transports.
13361338
func TestRetryNeverEmulated(t *testing.T) {
13371339
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1340+
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
1341+
if err != nil {
1342+
t.Fatalf("creating bucket: %v", err)
1343+
}
13381344
instructions := map[string][]string{"storage.buckets.get": {"return-503"}}
1339-
testID := createRetryTest(t, project, bucket, client, instructions)
1345+
testID := createRetryTest(t, client, instructions)
13401346
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
1341-
_, err := client.GetBucket(ctx, bucket, nil, withRetryConfig(&retryConfig{policy: RetryNever}))
1347+
_, err = client.GetBucket(ctx, bucket, nil, withRetryConfig(&retryConfig{policy: RetryNever}))
13421348

13431349
var ae *apierror.APIError
13441350
if errors.As(err, &ae) {
@@ -1354,12 +1360,16 @@ func TestRetryNeverEmulated(t *testing.T) {
13541360
// Test that errors are wrapped correctly if retry happens until a timeout.
13551361
func TestRetryTimeoutEmulated(t *testing.T) {
13561362
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1363+
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
1364+
if err != nil {
1365+
t.Fatalf("creating bucket: %v", err)
1366+
}
13571367
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
1358-
testID := createRetryTest(t, project, bucket, client, instructions)
1368+
testID := createRetryTest(t, client, instructions)
13591369
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
13601370
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
13611371
defer cancel()
1362-
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true))
1372+
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true))
13631373

13641374
var ae *apierror.APIError
13651375
if errors.As(err, &ae) {
@@ -1379,11 +1389,15 @@ func TestRetryTimeoutEmulated(t *testing.T) {
13791389
// Test that errors are wrapped correctly if retry happens until max attempts.
13801390
func TestRetryMaxAttemptsEmulated(t *testing.T) {
13811391
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1392+
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
1393+
if err != nil {
1394+
t.Fatalf("creating bucket: %v", err)
1395+
}
13821396
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
1383-
testID := createRetryTest(t, project, bucket, client, instructions)
1397+
testID := createRetryTest(t, client, instructions)
13841398
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
13851399
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
1386-
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))
1400+
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))
13871401

13881402
var ae *apierror.APIError
13891403
if errors.As(err, &ae) {
@@ -1426,8 +1440,12 @@ func TestTimeoutErrorEmulated(t *testing.T) {
14261440
// Test that server-side DEADLINE_EXCEEDED errors are retried as expected with gRPC.
14271441
func TestRetryDeadlineExceedeEmulated(t *testing.T) {
14281442
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1443+
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
1444+
if err != nil {
1445+
t.Fatalf("creating bucket: %v", err)
1446+
}
14291447
instructions := map[string][]string{"storage.buckets.get": {"return-504", "return-504"}}
1430-
testID := createRetryTest(t, project, bucket, client, instructions)
1448+
testID := createRetryTest(t, client, instructions)
14311449
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
14321450
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
14331451
if _, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config)); err != nil {
@@ -1436,17 +1454,61 @@ func TestRetryDeadlineExceedeEmulated(t *testing.T) {
14361454
})
14371455
}
14381456

1457+
// Test validates the retry for stalled read-request, when client is created with
1458+
// WithReadStallTimeout.
1459+
func TestRetryReadReqStallEmulated(t *testing.T) {
1460+
multiTransportTest(skipJSONReads(skipGRPC("not supported"), "not supported"), t, func(t *testing.T, ctx context.Context, project, _ string, client *Client) {
1461+
// Setup bucket and upload object.
1462+
bucket := fmt.Sprintf("http-bucket-%d", time.Now().Nanosecond())
1463+
if _, err := client.tc.CreateBucket(context.Background(), project, bucket, &BucketAttrs{Name: bucket}, nil); err != nil {
1464+
t.Fatalf("client.CreateBucket: %v", err)
1465+
}
1466+
1467+
name, _, _, err := createObjectWithContent(ctx, bucket, randomBytes3MiB)
1468+
if err != nil {
1469+
t.Fatalf("createObject: %v", err)
1470+
}
1471+
1472+
// Plant stall at start for 2s.
1473+
instructions := map[string][]string{"storage.objects.get": {"stall-for-2s-after-0K"}}
1474+
testID := createRetryTest(t, client.tc, instructions)
1475+
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
1476+
1477+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
1478+
defer cancel()
1479+
1480+
r, err := client.tc.NewRangeReader(ctx, &newRangeReaderParams{
1481+
bucket: bucket,
1482+
object: name,
1483+
gen: defaultGen,
1484+
offset: 0,
1485+
length: -1,
1486+
}, idempotent(true))
1487+
if err != nil {
1488+
t.Fatalf("NewRangeReader: %v", err)
1489+
}
1490+
defer r.Close()
1491+
1492+
buf := &bytes.Buffer{}
1493+
if _, err := io.Copy(buf, r); err != nil {
1494+
t.Fatalf("io.Copy: %v", err)
1495+
}
1496+
if !bytes.Equal(buf.Bytes(), randomBytes3MiB) {
1497+
t.Errorf("content does not match, got len %v, want len %v", buf.Len(), len(randomBytes3MiB))
1498+
}
1499+
1500+
}, experimental.WithReadStallTimeout(
1501+
&experimental.ReadStallTimeoutConfig{
1502+
TargetPercentile: 0.99,
1503+
Min: time.Second,
1504+
}))
1505+
}
1506+
14391507
// createRetryTest creates a bucket in the emulator and sets up a test using the
14401508
// Retry Test API for the given instructions. This is intended for emulator tests
14411509
// of retry behavior that are not covered by conformance tests.
1442-
func createRetryTest(t *testing.T, project, bucket string, client storageClient, instructions map[string][]string) string {
1510+
func createRetryTest(t *testing.T, client storageClient, instructions map[string][]string) string {
14431511
t.Helper()
1444-
ctx := context.Background()
1445-
1446-
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
1447-
if err != nil {
1448-
t.Fatalf("creating bucket: %v", err)
1449-
}
14501512

14511513
// Need the HTTP hostname to set up a retry test, as well as knowledge of
14521514
// underlying transport to specify instructions.
@@ -1470,14 +1532,20 @@ func createRetryTest(t *testing.T, project, bucket string, client storageClient,
14701532
return et.id
14711533
}
14721534

1473-
// createObject creates an object in the emulator and returns its name, generation, and
1474-
// metageneration.
1535+
// createObject creates an object in the emulator with content randomBytesToWrite and
1536+
// returns its name, generation, and metageneration.
14751537
func createObject(ctx context.Context, bucket string) (string, int64, int64, error) {
1538+
return createObjectWithContent(ctx, bucket, randomBytesToWrite)
1539+
}
1540+
1541+
// createObject creates an object in the emulator with the provided []byte contents,
1542+
// and returns its name, generation, and metageneration.
1543+
func createObjectWithContent(ctx context.Context, bucket string, bytes []byte) (string, int64, int64, error) {
14761544
prefix := time.Now().Nanosecond()
14771545
objName := fmt.Sprintf("%d-object", prefix)
14781546

14791547
w := veneerClient.Bucket(bucket).Object(objName).NewWriter(ctx)
1480-
if _, err := w.Write(randomBytesToWrite); err != nil {
1548+
if _, err := w.Write(bytes); err != nil {
14811549
return "", 0, 0, fmt.Errorf("failed to populate test data: %w", err)
14821550
}
14831551
if err := w.Close(); err != nil {

storage/experimental/experimental.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package experimental is a collection of experimental features that might
16+
// have some rough edges to them. Housing experimental features in this package
17+
// results in a user accessing these APIs as `experimental.Foo`, thereby making
18+
// it explicit that the feature is experimental and using them in production
19+
// code is at their own risk.
20+
//
21+
// All APIs in this package are experimental.
22+
package experimental
23+
24+
import (
25+
"time"
26+
27+
"cloud.google.com/go/storage/internal"
28+
"google.golang.org/api/option"
29+
)
30+
31+
// WithReadStallTimeout provides a [ClientOption] that may be passed to [storage.NewClient].
32+
// It enables the client to retry stalled requests when starting a download from
33+
// Cloud Storage. If the timeout elapses with no response from the server, the request
34+
// is automatically retried.
35+
// The timeout is initially set to ReadStallTimeoutConfig.Min. The client tracks
36+
// latency across all read requests from the client, and can adjust the timeout higher
37+
// to the target percentile when latency from the server is high.
38+
// Currently, this is supported only for downloads ([storage.NewReader] and
39+
// [storage.NewRangeReader] calls) and only for the XML API. Other read APIs (gRPC & JSON)
40+
// will be supported soon.
41+
func WithReadStallTimeout(rstc *ReadStallTimeoutConfig) option.ClientOption {
42+
// TODO (raj-prince): To keep separate dynamicDelay instance for different BucketHandle.
43+
// Currently, dynamicTimeout is kept at the client and hence shared across all the
44+
// BucketHandle, which is not the ideal state. As latency depends on location of VM
45+
// and Bucket, and read latency of different buckets may lie in different range.
46+
// Hence having a separate dynamicTimeout instance at BucketHandle level will
47+
// be better.
48+
return internal.WithReadStallTimeout.(func(config *ReadStallTimeoutConfig) option.ClientOption)(rstc)
49+
}
50+
51+
// ReadStallTimeoutConfig defines the timeout which is adjusted dynamically based on
52+
// past observed latencies.
53+
type ReadStallTimeoutConfig struct {
54+
// Min is the minimum duration of the timeout. The default value is 500ms. Requests
55+
// taking shorter than this value to return response headers will never time out.
56+
// In general, you should choose a Min value that is greater than the typical value
57+
// for the target percentile.
58+
Min time.Duration
59+
60+
// TargetPercentile is the percentile to target for the dynamic timeout. The default
61+
// value is 0.99. At the default percentile, at most 1% of requests will be timed out
62+
// and retried.
63+
TargetPercentile float64
64+
}

storage/http_client.go

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"hash/crc32"
2323
"io"
2424
"io/ioutil"
25+
"log"
2526
"net/http"
2627
"net/url"
2728
"os"
@@ -47,13 +48,14 @@ import (
4748
// httpStorageClient is the HTTP-JSON API implementation of the transport-agnostic
4849
// storageClient interface.
4950
type httpStorageClient struct {
50-
creds *google.Credentials
51-
hc *http.Client
52-
xmlHost string
53-
raw *raw.Service
54-
scheme string
55-
settings *settings
56-
config *storageConfig
51+
creds *google.Credentials
52+
hc *http.Client
53+
xmlHost string
54+
raw *raw.Service
55+
scheme string
56+
settings *settings
57+
config *storageConfig
58+
dynamicReadReqStallTimeout *dynamicDelay
5759
}
5860

5961
// newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON
@@ -128,14 +130,29 @@ func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageCl
128130
return nil, fmt.Errorf("supplied endpoint %q is not valid: %w", ep, err)
129131
}
130132

133+
var dd *dynamicDelay
134+
if config.readStallTimeoutConfig != nil {
135+
drrstConfig := config.readStallTimeoutConfig
136+
dd, err = newDynamicDelay(
137+
drrstConfig.TargetPercentile,
138+
getDynamicReadReqIncreaseRateFromEnv(),
139+
getDynamicReadReqInitialTimeoutSecFromEnv(drrstConfig.Min),
140+
drrstConfig.Min,
141+
defaultDynamicReqdReqMaxTimeout)
142+
if err != nil {
143+
return nil, fmt.Errorf("creating dynamic-delay: %w", err)
144+
}
145+
}
146+
131147
return &httpStorageClient{
132-
creds: creds,
133-
hc: hc,
134-
xmlHost: u.Host,
135-
raw: rawService,
136-
scheme: u.Scheme,
137-
settings: s,
138-
config: &config,
148+
creds: creds,
149+
hc: hc,
150+
xmlHost: u.Host,
151+
raw: rawService,
152+
scheme: u.Scheme,
153+
settings: s,
154+
config: &config,
155+
dynamicReadReqStallTimeout: dd,
139156
}, nil
140157
}
141158

@@ -858,7 +875,46 @@ func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRa
858875
reopen := readerReopen(ctx, req.Header, params, s,
859876
func(ctx context.Context) (*http.Response, error) {
860877
setHeadersFromCtx(ctx, req.Header)
861-
return c.hc.Do(req.WithContext(ctx))
878+
879+
if c.dynamicReadReqStallTimeout == nil {
880+
return c.hc.Do(req.WithContext(ctx))
881+
}
882+
883+
cancelCtx, cancel := context.WithCancel(ctx)
884+
var (
885+
res *http.Response
886+
err error
887+
)
888+
889+
done := make(chan bool)
890+
go func() {
891+
reqStartTime := time.Now()
892+
res, err = c.hc.Do(req.WithContext(cancelCtx))
893+
if err == nil {
894+
reqLatency := time.Since(reqStartTime)
895+
c.dynamicReadReqStallTimeout.update(reqLatency)
896+
} else if errors.Is(err, context.Canceled) {
897+
// context.Canceled means operation took more than current dynamicTimeout,
898+
// hence should be increased.
899+
c.dynamicReadReqStallTimeout.increase()
900+
}
901+
done <- true
902+
}()
903+
904+
// Wait until timeout or request is successful.
905+
timer := time.After(c.dynamicReadReqStallTimeout.getValue())
906+
select {
907+
case <-timer:
908+
log.Printf("stalled read-req cancelled after %fs", c.dynamicReadReqStallTimeout.getValue().Seconds())
909+
cancel()
910+
err = context.DeadlineExceeded
911+
if res != nil && res.Body != nil {
912+
res.Body.Close()
913+
}
914+
case <-done:
915+
cancel = nil
916+
}
917+
return res, err
862918
},
863919
func() error { return setConditionsHeaders(req.Header, params.conds) },
864920
func() { req.URL.RawQuery = fmt.Sprintf("generation=%d", params.gen) })

storage/integration_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ import (
5959
itesting "google.golang.org/api/iterator/testing"
6060
"google.golang.org/api/option"
6161
"google.golang.org/api/transport"
62-
"google.golang.org/grpc"
6362
"google.golang.org/grpc/codes"
6463
"google.golang.org/grpc/status"
6564
)
@@ -93,7 +92,6 @@ var (
9392
)
9493

9594
func TestMain(m *testing.M) {
96-
grpc.EnableTracing = true
9795
cleanup := initIntegrationTest()
9896
cleanupEmulatorClients := initEmulatorClients()
9997
exit := m.Run()

0 commit comments

Comments
 (0)