Skip to content

Commit 09fa125

Browse files
authored
feat(gensupport): per-chunk transfer timeout configs (#2865)
feat(gensupport): per-chunk transfer timeout configs Allow users to configure the per-chunk transfer timeout for retries that's used during resumable uploads. Needs to be exposed via the manual layer for storage. Tested the feature(with default timeout and with some random value) with storagetestbench server emulator.
1 parent 44435a9 commit 09fa125

File tree

5 files changed

+82
-16
lines changed

5 files changed

+82
-16
lines changed

googleapi/googleapi.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,20 @@ func ChunkSize(size int) MediaOption {
259259
return chunkSizeOption(size)
260260
}
261261

262+
type chunkTransferTimeoutOption time.Duration
263+
264+
func (cd chunkTransferTimeoutOption) setOptions(o *MediaOptions) {
265+
o.ChunkTransferTimeout = time.Duration(cd)
266+
}
267+
268+
// ChunkTransferTimeout returns a MediaOption which sets a per-chunk
269+
// transfer timeout for resumable uploads. If a single chunk has been
270+
// attempting to upload for longer than this time then the old req got canceled and retried.
271+
// The default is no timeout for the request.
272+
func ChunkTransferTimeout(timeout time.Duration) MediaOption {
273+
return chunkTransferTimeoutOption(timeout)
274+
}
275+
262276
type chunkRetryDeadlineOption time.Duration
263277

264278
func (cd chunkRetryDeadlineOption) setOptions(o *MediaOptions) {
@@ -283,6 +297,7 @@ type MediaOptions struct {
283297
ForceEmptyContentType bool
284298
ChunkSize int
285299
ChunkRetryDeadline time.Duration
300+
ChunkTransferTimeout time.Duration
286301
}
287302

288303
// ProcessMediaOptions stores options from opts in a MediaOptions.

internal/gensupport/media.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,14 @@ func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer
135135
// code only.
136136
type MediaInfo struct {
137137
// At most one of Media and MediaBuffer will be set.
138-
media io.Reader
139-
buffer *MediaBuffer
140-
singleChunk bool
141-
mType string
142-
size int64 // mediaSize, if known. Used only for calls to progressUpdater_.
143-
progressUpdater googleapi.ProgressUpdater
144-
chunkRetryDeadline time.Duration
138+
media io.Reader
139+
buffer *MediaBuffer
140+
singleChunk bool
141+
mType string
142+
size int64 // mediaSize, if known. Used only for calls to progressUpdater_.
143+
progressUpdater googleapi.ProgressUpdater
144+
chunkRetryDeadline time.Duration
145+
chunkTransferTimeout time.Duration
145146
}
146147

147148
// NewInfoFromMedia should be invoked from the Media method of a call. It returns a
@@ -157,6 +158,7 @@ func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo {
157158
}
158159
}
159160
mi.chunkRetryDeadline = opts.ChunkRetryDeadline
161+
mi.chunkTransferTimeout = opts.ChunkTransferTimeout
160162
mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize)
161163
return mi
162164
}
@@ -294,7 +296,8 @@ func (mi *MediaInfo) ResumableUpload(locURI string) *ResumableUpload {
294296
mi.progressUpdater(curr, mi.size)
295297
}
296298
},
297-
ChunkRetryDeadline: mi.chunkRetryDeadline,
299+
ChunkRetryDeadline: mi.chunkRetryDeadline,
300+
ChunkTransferTimeout: mi.chunkTransferTimeout,
298301
}
299302
}
300303

internal/gensupport/media_test.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,12 +218,13 @@ func TestUploadRequestGetBody(t *testing.T) {
218218

219219
func TestResumableUpload(t *testing.T) {
220220
for _, test := range []struct {
221-
desc string
222-
r io.Reader
223-
chunkSize int
224-
wantUploadType string
225-
wantResumableUpload bool
226-
chunkRetryDeadline time.Duration
221+
desc string
222+
r io.Reader
223+
chunkSize int
224+
wantUploadType string
225+
wantResumableUpload bool
226+
chunkRetryDeadline time.Duration
227+
chunkTransferTimeOut time.Duration
227228
}{
228229
{
229230
desc: "chunk size of zero: don't use a MediaBuffer; upload as a single chunk",
@@ -263,11 +264,22 @@ func TestResumableUpload(t *testing.T) {
263264
wantResumableUpload: true,
264265
chunkRetryDeadline: 1 * time.Second,
265266
},
267+
{
268+
desc: "confirm that ChunkTransferTimeout is carried to ResumableUpload",
269+
r: &nullReader{2 * googleapi.MinUploadChunkSize},
270+
chunkSize: 1,
271+
wantUploadType: "resumable",
272+
wantResumableUpload: true,
273+
chunkTransferTimeOut: 5 * time.Second,
274+
},
266275
} {
267276
opts := []googleapi.MediaOption{googleapi.ChunkSize(test.chunkSize)}
268277
if test.chunkRetryDeadline != 0 {
269278
opts = append(opts, googleapi.ChunkRetryDeadline(test.chunkRetryDeadline))
270279
}
280+
if test.chunkTransferTimeOut != 0 {
281+
opts = append(opts, googleapi.ChunkTransferTimeout(test.chunkTransferTimeOut))
282+
}
271283
mi := NewInfoFromMedia(test.r, opts)
272284
if got, want := mi.UploadType(), test.wantUploadType; got != want {
273285
t.Errorf("%s: upload type: got %q, want %q", test.desc, got, want)
@@ -284,6 +296,15 @@ func TestResumableUpload(t *testing.T) {
284296
t.Errorf("%s: test case invalid; resumable upload is nil", test.desc)
285297
}
286298
}
299+
if test.chunkTransferTimeOut != 0 {
300+
if got := mi.ResumableUpload(""); got != nil {
301+
if got.ChunkTransferTimeout != test.chunkTransferTimeOut {
302+
t.Errorf("%s: ChunkTransferTimeout: got %v, want %v", test.desc, got.ChunkTransferTimeout, test.chunkTransferTimeOut)
303+
}
304+
} else {
305+
t.Errorf("%s: test case invalid; resumable upload is nil", test.desc)
306+
}
307+
}
287308
}
288309
}
289310

internal/gensupport/resumable.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ type ResumableUpload struct {
4343
// retries should happen.
4444
ChunkRetryDeadline time.Duration
4545

46+
// ChunkTransferTimeout configures the per-chunk transfer timeout. If a chunk upload stalls for longer than
47+
// this duration, the upload will be retried.
48+
ChunkTransferTimeout time.Duration
49+
4650
// Track current request invocation ID and attempt count for retry metrics
4751
// and idempotency headers.
4852
invocationID string
@@ -241,13 +245,35 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err
241245
default:
242246
}
243247

244-
resp, err = rx.transferChunk(ctx)
248+
// rCtx is derived from a context with a defined transferTimeout with non-zero value.
249+
// If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
250+
// triggering a retry of the request.
251+
var rCtx context.Context
252+
var cancel context.CancelFunc
253+
254+
rCtx = ctx
255+
if rx.ChunkTransferTimeout != 0 {
256+
rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout)
257+
}
258+
259+
resp, err = rx.transferChunk(rCtx)
245260

246261
var status int
247262
if resp != nil {
248263
status = resp.StatusCode
249264
}
250265

266+
// The upload should be retried if the rCtx is canceled due to a timeout.
267+
select {
268+
case <-rCtx.Done():
269+
if errors.Is(rCtx.Err(), context.DeadlineExceeded) {
270+
// Cancel the context for rCtx
271+
cancel()
272+
continue
273+
}
274+
default:
275+
}
276+
251277
// Check if we should retry the request.
252278
if !errorFunc(status, err) {
253279
quitAfterTimer.Stop()

internal/gensupport/resumable_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package gensupport
66

77
import (
88
"context"
9+
"errors"
910
"fmt"
1011
"io"
1112
"net/http"
@@ -280,7 +281,7 @@ func TestCancelUploadBasic(t *testing.T) {
280281
defer func() { backoff = oldBackoff }()
281282

282283
res, err := rx.Upload(ctx)
283-
if err != context.Canceled {
284+
if !errors.Is(err, context.Canceled) {
284285
t.Fatalf("Upload err: got: %v; want: context cancelled", err)
285286
}
286287
if res != nil {

0 commit comments

Comments
 (0)