Skip to content

Commit fd12e4c

Browse files
authored
Merge pull request #1758 from k8s-infra-cherrypick-robot/cherry-pick-1751-to-release-1.30
[release-1.30] fix: fix azcopy exec timeout func
2 parents 551714b + 90d3fa1 commit fd12e4c

File tree

6 files changed

+128
-47
lines changed

6 files changed

+128
-47
lines changed

pkg/azurefile/azurefile.go

+41-39
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,6 @@ const (
187187
SnapshotID = "snapshot_id"
188188

189189
FSGroupChangeNone = "None"
190-
191-
waitForAzCopyInterval = 2 * time.Second
192190
)
193191

194192
var (
@@ -1011,55 +1009,59 @@ func (d *Driver) copyFileShare(ctx context.Context, req *csi.CreateVolumeRequest
10111009
return fmt.Errorf("srcFileShareName(%s) or dstFileShareName(%s) is empty", srcFileShareName, dstFileShareName)
10121010
}
10131011

1014-
timeAfter := time.After(time.Duration(d.waitForAzCopyTimeoutMinutes) * time.Minute)
1015-
timeTick := time.Tick(waitForAzCopyInterval)
10161012
srcPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, srcFileShareName, accountSASToken)
10171013
dstPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, dstFileShareName, accountSASToken)
10181014

10191015
jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
10201016
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
1021-
if jobState == fileutil.AzcopyJobError || jobState == fileutil.AzcopyJobCompleted {
1017+
switch jobState {
1018+
case fileutil.AzcopyJobError, fileutil.AzcopyJobCompleted:
10221019
return err
1023-
}
1024-
klog.V(2).Infof("begin to copy fileshare %s to %s", srcFileShareName, dstFileShareName)
1025-
for {
1026-
select {
1027-
case <-timeTick:
1028-
jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
1029-
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
1030-
switch jobState {
1031-
case fileutil.AzcopyJobError, fileutil.AzcopyJobCompleted:
1020+
case fileutil.AzcopyJobRunning:
1021+
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
1022+
case fileutil.AzcopyJobNotFound:
1023+
klog.V(2).Infof("copy fileshare %s to %s", srcFileShareName, dstFileShareName)
1024+
execFuncWithAuth := func() error {
1025+
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
1026+
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
1027+
if len(authAzcopyEnv) > 0 {
1028+
cmd.Env = append(os.Environ(), authAzcopyEnv...)
1029+
}
1030+
if out, err := cmd.CombinedOutput(); err != nil {
1031+
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
1032+
}
1033+
return nil
1034+
}
1035+
timeoutFunc := func() error {
1036+
_, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
1037+
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcFileShareName, dstFileShareName, percent)
1038+
}
1039+
copyErr := fileutil.WaitForExecCompletion(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithAuth, timeoutFunc)
1040+
if accountSASToken == "" && copyErr != nil && strings.Contains(copyErr.Error(), authorizationPermissionMismatch) {
1041+
klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage File Data SMB Share Elevated Contributor\" role to controller identity, fall back to use sas token, original error: %v", copyErr)
1042+
d.azcopySasTokenCache.Set(accountName, "")
1043+
var sasToken string
1044+
if sasToken, _, err = d.getAzcopyAuth(ctx, accountName, "", storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true); err != nil {
10321045
return err
1033-
case fileutil.AzcopyJobNotFound:
1034-
klog.V(2).Infof("copy fileshare %s to %s", srcFileShareName, dstFileShareName)
1035-
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
1046+
}
1047+
execFuncWithSasToken := func() error {
1048+
cmd := exec.Command("azcopy", "copy", srcPath+sasToken, dstPath+sasToken)
10361049
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
1037-
if len(authAzcopyEnv) > 0 {
1038-
cmd.Env = append(os.Environ(), authAzcopyEnv...)
1050+
if out, err := cmd.CombinedOutput(); err != nil {
1051+
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
10391052
}
1040-
out, copyErr := cmd.CombinedOutput()
1041-
if accountSASToken == "" && strings.Contains(string(out), authorizationPermissionMismatch) && copyErr != nil {
1042-
klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage File Data SMB Share Elevated Contributor\" role to controller identity, fall back to use sas token, original output: %v", string(out))
1043-
d.azcopySasTokenCache.Set(accountName, "")
1044-
var sasToken string
1045-
if sasToken, _, err = d.getAzcopyAuth(ctx, accountName, "", storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true); err != nil {
1046-
return err
1047-
}
1048-
cmd := exec.Command("azcopy", "copy", srcPath+sasToken, dstPath+sasToken)
1049-
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
1050-
out, copyErr = cmd.CombinedOutput()
1051-
}
1052-
if copyErr != nil {
1053-
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstFileShareName, copyErr, string(out))
1054-
} else {
1055-
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
1056-
}
1057-
return copyErr
1053+
return nil
10581054
}
1059-
case <-timeAfter:
1060-
return fmt.Errorf("timeout waiting for copy fileshare %s to %s succeed", srcFileShareName, dstFileShareName)
1055+
copyErr = fileutil.WaitForExecCompletion(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithSasToken, timeoutFunc)
10611056
}
1057+
if copyErr != nil {
1058+
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstFileShareName, copyErr)
1059+
} else {
1060+
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
1061+
}
1062+
return copyErr
10621063
}
1064+
return err
10631065
}
10641066

10651067
// GetTotalAccountQuota returns the total quota in GB of all file shares in the storage account and the number of file shares

pkg/azurefile/controllerserver.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
117117
// logging the job status if it's volume cloning
118118
if req.GetVolumeContentSource() != nil {
119119
jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
120-
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
120+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
121121
}
122122
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
123123
}

pkg/azurefile/controllerserver_test.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -1890,7 +1890,7 @@ func TestCopyVolume(t *testing.T) {
18901890
},
18911891
},
18921892
{
1893-
name: "azcopy job is first in progress and then be completed",
1893+
name: "azcopy job is in progress",
18941894
testFunc: func(t *testing.T) {
18951895
d := NewFakeDriver()
18961896
mp := map[string]string{}
@@ -1919,15 +1919,12 @@ func TestCopyVolume(t *testing.T) {
19191919

19201920
m := util.NewMockEXEC(ctrl)
19211921
listStr1 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false"
1922-
listStr2 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false"
1923-
o1 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return(listStr1, nil).Times(1)
1922+
m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return(listStr1, nil).Times(1)
19241923
m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return("Percent Complete (approx): 50.0", nil)
1925-
o2 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return(listStr2, nil)
1926-
gomock.InOrder(o1, o2)
19271924

19281925
d.azcopy.ExecCmd = m
19291926

1930-
var expectedErr error
1927+
expectedErr := fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is 50.0%%")
19311928
err := d.copyVolume(ctx, req, "sastoken", []string{}, "", "", secret, &fileclient.ShareOptions{Name: "dstFileshare"}, nil, "core.windows.net")
19321929
if !reflect.DeepEqual(err, expectedErr) {
19331930
t.Errorf("Unexpected error: %v", err)

pkg/azurefile/volume_lock.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ import (
2323
)
2424

2525
const (
26-
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
26+
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
27+
volumeOperationAlreadyExistsWithAzcopyFmt = "An operation using azcopy with the given Volume ID %s already exists. Azcopy job status: %s, copy percent: %s%%, error: %v"
2728
)
2829

2930
// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs

pkg/util/util.go

+28
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"os/exec"
2323
"strings"
2424
"sync"
25+
"time"
2526

2627
"k8s.io/klog/v2"
2728
)
@@ -205,3 +206,30 @@ func parseAzcopyJobShow(jobshow string) (AzcopyJobState, string, error) {
205206
}
206207
return AzcopyJobRunning, strings.ReplaceAll(segments[1], "\n", ""), nil
207208
}
209+
210+
// ExecFunc returns a exec function's output and error
211+
type ExecFunc func() (err error)
212+
213+
// TimeoutFunc returns output and error if an ExecFunc timeout
214+
type TimeoutFunc func() (err error)
215+
216+
// WaitForExecCompletion waits for the exec function to complete or return timeout error
217+
func WaitForExecCompletion(timeout time.Duration, execFunc ExecFunc, timeoutFunc TimeoutFunc) error {
218+
// Create a channel to receive the result of the azcopy exec function
219+
done := make(chan bool)
220+
var err error
221+
222+
// Start the azcopy exec function in a goroutine
223+
go func() {
224+
err = execFunc()
225+
done <- true
226+
}()
227+
228+
// Wait for the function to complete or time out
229+
select {
230+
case <-done:
231+
return err
232+
case <-time.After(timeout):
233+
return timeoutFunc()
234+
}
235+
}

pkg/util/util_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"reflect"
2222
"testing"
23+
"time"
2324

2425
gomock "go.uber.org/mock/gomock"
2526
)
@@ -260,3 +261,55 @@ func TestParseAzcopyJobShow(t *testing.T) {
260261
}
261262
}
262263
}
264+
265+
func TestWaitForExecCompletion(t *testing.T) {
266+
tests := []struct {
267+
desc string
268+
timeout time.Duration
269+
execFunc ExecFunc
270+
timeoutFunc TimeoutFunc
271+
expectedErr error
272+
}{
273+
{
274+
desc: "execFunc returns error",
275+
timeout: 1 * time.Second,
276+
execFunc: func() error {
277+
return fmt.Errorf("execFunc error")
278+
},
279+
timeoutFunc: func() error {
280+
return fmt.Errorf("timeout error")
281+
},
282+
expectedErr: fmt.Errorf("execFunc error"),
283+
},
284+
{
285+
desc: "execFunc timeout",
286+
timeout: 1 * time.Second,
287+
execFunc: func() error {
288+
time.Sleep(2 * time.Second)
289+
return nil
290+
},
291+
timeoutFunc: func() error {
292+
return fmt.Errorf("timeout error")
293+
},
294+
expectedErr: fmt.Errorf("timeout error"),
295+
},
296+
{
297+
desc: "execFunc completed successfully",
298+
timeout: 1 * time.Second,
299+
execFunc: func() error {
300+
return nil
301+
},
302+
timeoutFunc: func() error {
303+
return fmt.Errorf("timeout error")
304+
},
305+
expectedErr: nil,
306+
},
307+
}
308+
309+
for _, test := range tests {
310+
err := WaitForExecCompletion(test.timeout, test.execFunc, test.timeoutFunc)
311+
if err != nil && (err.Error() != test.expectedErr.Error()) {
312+
t.Errorf("unexpected error: %v, expected error: %v", err, test.expectedErr)
313+
}
314+
}
315+
}

0 commit comments

Comments
 (0)