Skip to content

Commit 7b7c4cd

Browse files
authored
Merge pull request #2394 from kubernetes-sigs/clean-azcopy-jobs
fix: cleanup azcopy jobs after job complete
2 parents 3eaefde + 0a59245 commit 7b7c4cd

File tree

4 files changed

+60
-28
lines changed

4 files changed

+60
-28
lines changed

pkg/azurefile/azurefile.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func NewDriver(options *DriverOptions) *Driver {
325325
driver.volLockMap = newLockMap()
326326
driver.subnetLockMap = newLockMap()
327327
driver.volumeLocks = newVolumeLocks()
328-
driver.azcopy = &fileutil.Azcopy{}
328+
driver.azcopy = &fileutil.Azcopy{ExecCmd: &fileutil.ExecCommand{}}
329329
driver.kubeconfig = options.KubeConfig
330330
driver.endpoint = options.Endpoint
331331
driver.resolver = new(NetResolver)

pkg/azurefile/controllerserver.go

+14-11
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"strings"
2727
"time"
2828

29-
volumehelper "sigs.k8s.io/azurefile-csi-driver/pkg/util"
29+
"sigs.k8s.io/azurefile-csi-driver/pkg/util"
3030

3131
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
3232
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
@@ -95,7 +95,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
9595
}
9696

9797
capacityBytes := req.GetCapacityRange().GetRequiredBytes()
98-
requestGiB := volumehelper.RoundUpGiB(capacityBytes)
98+
requestGiB := util.RoundUpGiB(capacityBytes)
9999
if requestGiB == 0 {
100100
requestGiB = defaultAzureFileQuota
101101
klog.Warningf("no quota specified, set as default value(%d GiB)", defaultAzureFileQuota)
@@ -642,7 +642,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
642642
// use uuid as vhd disk name if file share specified
643643
diskName = uuid.NewString() + vhdSuffix
644644
}
645-
diskSizeBytes := volumehelper.GiBToBytes(requestGiB)
645+
diskSizeBytes := util.GiBToBytes(requestGiB)
646646
klog.V(2).Infof("begin to create vhd file(%s) size(%d) on share(%s) on account(%s) type(%s) rg(%s) location(%s)",
647647
diskName, diskSizeBytes, validFileShareName, account, sku, resourceGroup, location)
648648
if err := createDisk(ctx, accountName, accountKey, d.getStorageEndPointSuffix(), validFileShareName, diskName, diskSizeBytes); err != nil {
@@ -914,7 +914,7 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
914914
klog.V(2).Infof("snapshot(%s) already exists", snapshotName)
915915
return &csi.CreateSnapshotResponse{
916916
Snapshot: &csi.Snapshot{
917-
SizeBytes: volumehelper.GiBToBytes(int64(itemSnapshotQuota)),
917+
SizeBytes: util.GiBToBytes(int64(itemSnapshotQuota)),
918918
SnapshotId: sourceVolumeID + "#" + itemSnapshot,
919919
SourceVolumeId: sourceVolumeID,
920920
CreationTime: timestamppb.New(itemSnapshotTime),
@@ -996,7 +996,7 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
996996

997997
createResp := &csi.CreateSnapshotResponse{
998998
Snapshot: &csi.Snapshot{
999-
SizeBytes: volumehelper.GiBToBytes(int64(itemSnapshotQuota)),
999+
SizeBytes: util.GiBToBytes(int64(itemSnapshotQuota)),
10001000
SnapshotId: sourceVolumeID + "#" + itemSnapshot,
10011001
SourceVolumeId: sourceVolumeID,
10021002
CreationTime: timestamppb.New(itemSnapshotTime),
@@ -1134,21 +1134,21 @@ func (d *Driver) copyFileShareByAzcopy(srcFileShareName, dstFileShareName, srcPa
11341134
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
11351135

11361136
switch jobState {
1137-
case volumehelper.AzcopyJobError, volumehelper.AzcopyJobCompleted:
1137+
case util.AzcopyJobError, util.AzcopyJobCompleted, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped:
11381138
return err
1139-
case volumehelper.AzcopyJobRunning:
1139+
case util.AzcopyJobRunning:
11401140
err = wait.PollImmediate(20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() (bool, error) {
11411141
jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
11421142
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
11431143
if err != nil {
11441144
return false, err
11451145
}
1146-
if jobState == volumehelper.AzcopyJobRunning {
1146+
if jobState == util.AzcopyJobRunning {
11471147
return false, nil
11481148
}
11491149
return true, nil
11501150
})
1151-
case volumehelper.AzcopyJobNotFound:
1151+
case util.AzcopyJobNotFound:
11521152
klog.V(2).Infof("copy fileshare %s:%s to %s:%s", srcAccountName, srcFileShareName, dstAccountName, dstFileShareName)
11531153
execAzcopyJob := func() error {
11541154
if out, err := d.execAzcopyCopy(srcPathAuth, dstPath, azcopyCopyOptions, authAzcopyEnv); err != nil {
@@ -1160,13 +1160,16 @@ func (d *Driver) copyFileShareByAzcopy(srcFileShareName, dstFileShareName, srcPa
11601160
jobState, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
11611161
return fmt.Errorf("azcopy job status: %s, timeout waiting for copy fileshare %s:%s to %s:%s complete, current copy percent: %s%%", jobState, srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent)
11621162
}
1163-
err = volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execAzcopyJob, timeoutFunc)
1163+
err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execAzcopyJob, timeoutFunc)
11641164
}
11651165

11661166
if err != nil {
11671167
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstFileShareName, err)
11681168
} else {
11691169
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
1170+
if out, err := d.azcopy.CleanJobs(); err != nil {
1171+
klog.Warningf("clean azcopy jobs failed with error: %v, output: %s", err, string(out))
1172+
}
11701173
}
11711174
return err
11721175
}
@@ -1191,7 +1194,7 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.Controller
11911194
if capacityBytes == 0 {
11921195
return nil, status.Error(codes.InvalidArgument, "volume capacity range missing in request")
11931196
}
1194-
requestGiB := volumehelper.RoundUpGiB(capacityBytes)
1197+
requestGiB := util.RoundUpGiB(capacityBytes)
11951198
if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
11961199
return nil, status.Errorf(codes.InvalidArgument, "invalid expand volume request: %v", req)
11971200
}

pkg/util/util.go

+9-14
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,13 @@ const (
3434
type AzcopyJobState string
3535

3636
const (
37-
AzcopyJobError AzcopyJobState = "Error"
38-
AzcopyJobNotFound AzcopyJobState = "NotFound"
39-
AzcopyJobRunning AzcopyJobState = "Running"
40-
AzcopyJobCompleted AzcopyJobState = "Completed"
37+
AzcopyJobError AzcopyJobState = "Error"
38+
AzcopyJobNotFound AzcopyJobState = "NotFound"
39+
AzcopyJobRunning AzcopyJobState = "Running"
40+
AzcopyJobCompleted AzcopyJobState = "Completed"
41+
AzcopyJobCompletedWithErrors AzcopyJobState = "CompletedWithErrors"
42+
AzcopyJobCompletedWithSkipped AzcopyJobState = "CompletedWithSkipped"
43+
AzcopyJobCompletedWithErrorsAndSkipped AzcopyJobState = "CompletedWithErrorsAndSkipped"
4144
)
4245

4346
// RoundUpBytes rounds up the volume size in bytes up to multiplications of GiB
@@ -107,9 +110,6 @@ func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (Azc
107110
// Start Time: Wednesday, 09-Aug-23 09:09:03 UTC
108111
// Status: Cancelled
109112
// Command: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false
110-
if ac.ExecCmd == nil {
111-
ac.ExecCmd = &ExecCommand{}
112-
}
113113
out, err := ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv)
114114
// if grep command returns nothing, the exec will return exit status 1 error, so filter this error
115115
if err != nil && err.Error() != "exit status 1" {
@@ -143,13 +143,8 @@ func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (Azc
143143
return jobState, percent, nil
144144
}
145145

146-
// TestListJobs test azcopy jobs list command with authAzcopyEnv
147-
func (ac *Azcopy) TestListJobs(accountName, storageEndpointSuffix string, authAzcopyEnv []string) (string, error) {
148-
cmdStr := fmt.Sprintf("azcopy list %s", fmt.Sprintf("https://%s.file.%s", accountName, storageEndpointSuffix))
149-
if ac.ExecCmd == nil {
150-
ac.ExecCmd = &ExecCommand{}
151-
}
152-
return ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv)
146+
func (ac *Azcopy) CleanJobs() (string, error) {
147+
return ac.ExecCmd.RunCommand("azcopy jobs clean", nil)
153148
}
154149

155150
// parseAzcopyJobList parse command azcopy jobs list, get jobid and state from joblist

pkg/util/util_test.go

+36-2
Original file line numberDiff line numberDiff line change
@@ -162,15 +162,49 @@ func TestGetAzcopyJob(t *testing.T) {
162162
m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstFileshare -B 3"), []string{}).Return(test.showStr, test.showErr)
163163
}
164164

165-
azcopyFunc := &Azcopy{}
166-
azcopyFunc.ExecCmd = m
165+
azcopyFunc := &Azcopy{ExecCmd: m}
167166
jobState, percent, err := azcopyFunc.GetAzcopyJob(dstFileshare, []string{})
168167
if jobState != test.expectedJobState || percent != test.expectedPercent || !reflect.DeepEqual(err, test.expectedErr) {
169168
t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, err: %v, expected jobState: %v, percent: %v, err: %v", test.desc, jobState, percent, err, test.expectedJobState, test.expectedPercent, test.expectedErr)
170169
}
171170
}
172171
}
173172

173+
func TestCleanJobs(t *testing.T) {
174+
tests := []struct {
175+
desc string
176+
execStr string
177+
execErr error
178+
expectedErr error
179+
}{
180+
{
181+
desc: "run exec get error",
182+
execStr: "",
183+
execErr: fmt.Errorf("error"),
184+
expectedErr: fmt.Errorf("error"),
185+
},
186+
{
187+
desc: "run exec succeed",
188+
execStr: "cleaned",
189+
execErr: nil,
190+
expectedErr: nil,
191+
},
192+
}
193+
for _, test := range tests {
194+
ctrl := gomock.NewController(t)
195+
defer ctrl.Finish()
196+
197+
m := NewMockEXEC(ctrl)
198+
m.EXPECT().RunCommand(gomock.Eq("azcopy jobs clean"), nil).Return(test.execStr, test.execErr)
199+
200+
azcopyFunc := &Azcopy{ExecCmd: m}
201+
_, err := azcopyFunc.CleanJobs()
202+
if !reflect.DeepEqual(err, test.expectedErr) {
203+
t.Errorf("test[%s]: unexpected err: %v, expected err: %v", test.desc, err, test.expectedErr)
204+
}
205+
}
206+
}
207+
174208
func TestParseAzcopyJobList(t *testing.T) {
175209
tests := []struct {
176210
desc string

0 commit comments

Comments
 (0)