Skip to content

Commit 938f084

Browse files
authored
Merge pull request #1278 from umagnus/add_volume_cloning
feat: support volume cloning
2 parents 4870992 + b71e088 commit 938f084

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+13997
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
apiVersion: v1
3+
kind: PersistentVolumeClaim
4+
metadata:
5+
name: pvc-azurefile-clone
6+
namespace: default
7+
spec:
8+
accessModes:
9+
- ReadWriteMany
10+
resources:
11+
requests:
12+
storage: 100Gi
13+
storageClassName: azurefile-csi
14+
dataSource:
15+
kind: PersistentVolumeClaim
16+
name: pvc-azurefile

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ require (
3939
require (
4040
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.6.1
4141
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0
42+
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v0.1.0
4243
github.com/jongio/azidext/go/azidext v0.5.0
4344
github.com/onsi/ginkgo/v2 v2.11.0
4445
golang.org/x/sys v0.12.0

go.sum

+3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 h1:vcYCAze6p19qBW7MhZybI
4545
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0/go.mod h1:OQeznEEkTZ9OrhHJoDD8ZDq51FHgXjqtP9z6bEwBq9U=
4646
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 h1:sXr+ck84g/ZlZUOZiNELInmMgOsuGwdjjVkEIde0OtY=
4747
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aovMlrjvvXoPMBVSPzk9185BT0+eZM=
48+
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY=
49+
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v0.1.0 h1:HIMTmT8prBdCqb2F7NU6ptqF8sy8TASYfDmzDziblJA=
50+
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v0.1.0/go.mod h1:AjDdvSU6d92BGS2JfdsKi+H/c2vQY3OFp4qhxzsUH8g=
4851
github.com/Azure/azure-storage-file-go v0.8.0 h1:OX8DGsleWLUE6Mw4R/OeWEZMvsTIpwN94J59zqKQnTI=
4952
github.com/Azure/azure-storage-file-go v0.8.0/go.mod h1:3w3mufGcMjcOJ3w+4Gs+5wsSgkT7xDwWWqMMIrXtW4c=
5053
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=

pkg/azurefile/azurefile.go

+70
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"encoding/binary"
2323
"fmt"
2424
"net/url"
25+
"os/exec"
2526
"strconv"
2627
"strings"
2728
"sync"
@@ -32,6 +33,8 @@ import (
3233
"github.com/container-storage-interface/spec/lib/go/csi"
3334
"github.com/pborman/uuid"
3435
"github.com/rubiojr/go-vhd/vhd"
36+
"google.golang.org/grpc/codes"
37+
"google.golang.org/grpc/status"
3538

3639
v1 "k8s.io/api/core/v1"
3740
"k8s.io/apimachinery/pkg/api/errors"
@@ -174,6 +177,9 @@ const (
174177
SnapshotID = "snapshot_id"
175178

176179
FSGroupChangeNone = "None"
180+
181+
waitForCopyInterval = 5 * time.Second
182+
waitForCopyTimeout = 3 * time.Minute
177183
)
178184

179185
var (
@@ -209,6 +215,7 @@ type DriverOptions struct {
209215
SkipMatchingTagCacheExpireInMinutes int
210216
VolStatsCacheExpireInMinutes int
211217
PrintVolumeStatsCallLogs bool
218+
SasTokenExpirationMinutes int
212219
}
213220

214221
// Driver implements all interfaces of CSI drivers
@@ -260,6 +267,8 @@ type Driver struct {
260267
resizeFileShareFailureCache azcache.Resource
261268
// a timed cache storing volume stats <volumeID, volumeStats>
262269
volStatsCache azcache.Resource
270+
// sas expiry time for azcopy in volume clone
271+
sasTokenExpirationMinutes int
263272
}
264273

265274
// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
@@ -287,6 +296,7 @@ func NewDriver(options *DriverOptions) *Driver {
287296
driver.appendClosetimeoOption = options.AppendClosetimeoOption
288297
driver.appendNoShareSockOption = options.AppendNoShareSockOption
289298
driver.printVolumeStatsCallLogs = options.PrintVolumeStatsCallLogs
299+
driver.sasTokenExpirationMinutes = options.SasTokenExpirationMinutes
290300
driver.volLockMap = newLockMap()
291301
driver.subnetLockMap = newLockMap()
292302
driver.volumeLocks = newVolumeLocks()
@@ -363,6 +373,7 @@ func (d *Driver) Run(endpoint, kubeconfig string, testBool bool) {
363373
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
364374
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
365375
csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
376+
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
366377
})
367378
d.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
368379
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
@@ -889,6 +900,65 @@ func (d *Driver) ResizeFileShare(ctx context.Context, subsID, resourceGroup, acc
889900
})
890901
}
891902

903+
// CopyFileShare copies a fileshare in the same storage account
904+
func (d *Driver) copyFileShare(ctx context.Context, req *csi.CreateVolumeRequest, accountKey string, shareOptions *fileclient.ShareOptions, storageEndpointSuffix string) error {
905+
if shareOptions.Protocol == storage.EnabledProtocolsNFS {
906+
return fmt.Errorf("protocol nfs is not supported for volume cloning")
907+
}
908+
var sourceVolumeID string
909+
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
910+
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
911+
}
912+
resourceGroupName, accountName, srcFileShareName, _, _, _, err := GetFileShareInfo(sourceVolumeID) //nolint:dogsled
913+
if err != nil {
914+
return status.Error(codes.NotFound, err.Error())
915+
}
916+
dstFileShareName := shareOptions.Name
917+
if srcFileShareName == "" || dstFileShareName == "" {
918+
return fmt.Errorf("srcFileShareName(%s) or dstFileShareName(%s) is empty", srcFileShareName, dstFileShareName)
919+
}
920+
921+
klog.V(2).Infof("generate sas token for account(%s)", accountName)
922+
accountSasToken, genErr := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
923+
if genErr != nil {
924+
return genErr
925+
}
926+
927+
timeAfter := time.After(waitForCopyTimeout)
928+
timeTick := time.Tick(waitForCopyInterval)
929+
srcPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, srcFileShareName, accountSasToken)
930+
dstPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, dstFileShareName, accountSasToken)
931+
932+
jobState, percent, err := getAzcopyJob(dstFileShareName)
933+
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
934+
if jobState == AzcopyJobError || jobState == AzcopyJobCompleted {
935+
return err
936+
}
937+
klog.V(2).Infof("begin to copy fileshare %s to %s", srcFileShareName, dstFileShareName)
938+
for {
939+
select {
940+
case <-timeTick:
941+
jobState, percent, err := getAzcopyJob(dstFileShareName)
942+
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
943+
switch jobState {
944+
case AzcopyJobError, AzcopyJobCompleted:
945+
return err
946+
case AzcopyJobNotFound:
947+
klog.V(2).Infof("copy fileshare %s to %s", srcFileShareName, dstFileShareName)
948+
out, copyErr := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
949+
if copyErr != nil {
950+
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstFileShareName, copyErr, string(out))
951+
} else {
952+
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
953+
}
954+
return copyErr
955+
}
956+
case <-timeAfter:
957+
return fmt.Errorf("timeout waiting for copy fileshare %s to %s succeed", srcFileShareName, dstFileShareName)
958+
}
959+
}
960+
}
961+
892962
// GetTotalAccountQuota returns the total quota in GB of all file shares in the storage account and the number of file shares
893963
func (d *Driver) GetTotalAccountQuota(ctx context.Context, subsID, resourceGroup, accountName string) (int32, int32, error) {
894964
fileshares, err := d.cloud.FileClient.WithSubscriptionID(subsID).ListFileShare(ctx, resourceGroup, accountName, "", "")

pkg/azurefile/controllerserver.go

+66-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626

2727
volumehelper "sigs.k8s.io/azurefile-csi-driver/pkg/util"
2828

29+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/sas"
30+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/service"
2931
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
3032
"github.com/Azure/azure-storage-file-go/azfile"
3133
"github.com/container-storage-interface/spec/lib/go/csi"
@@ -104,6 +106,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
104106
}
105107

106108
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
109+
// logging the job status if it's volume cloning
110+
if req.GetVolumeContentSource() != nil {
111+
jobState, percent, err := getAzcopyJob(volName)
112+
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
113+
}
107114
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
108115
}
109116
defer d.volumeLocks.Release(volName)
@@ -516,7 +523,16 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
516523
}
517524

518525
var volumeID string
519-
mc := metrics.NewMetricContext(azureFileCSIDriverName, "controller_create_volume", d.cloud.ResourceGroup, subsID, d.Name)
526+
requestName := "controller_create_volume"
527+
if req.GetVolumeContentSource() != nil {
528+
switch req.VolumeContentSource.Type.(type) {
529+
case *csi.VolumeContentSource_Snapshot:
530+
requestName = "controller_create_volume_from_snapshot"
531+
case *csi.VolumeContentSource_Volume:
532+
requestName = "controller_create_volume_from_volume"
533+
}
534+
}
535+
mc := metrics.NewMetricContext(azureFileCSIDriverName, requestName, d.cloud.ResourceGroup, subsID, d.Name)
520536
isOperationSucceeded := false
521537
defer func() {
522538
mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
@@ -543,9 +559,20 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
543559
}
544560
return nil, status.Errorf(codes.Internal, "failed to create file share(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d), error: %v", validFileShareName, account, sku, subsID, resourceGroup, location, fileShareSize, err)
545561
}
562+
if req.GetVolumeContentSource() != nil {
563+
accountKeyCopy, err := d.GetStorageAccesskey(ctx, accountOptions, req.GetSecrets(), secretName, secretNamespace)
564+
if err != nil {
565+
return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
566+
}
567+
if err := d.copyVolume(ctx, req, accountKeyCopy, shareOptions, storageEndpointSuffix); err != nil {
568+
return nil, err
569+
}
570+
// storeAccountKey is not needed here since copy volume is only using SAS token
571+
storeAccountKey = false
572+
}
546573
klog.V(2).Infof("create file share %s on storage account %s successfully", validFileShareName, accountName)
547574

548-
if isDiskFsType(fsType) && !strings.HasSuffix(diskName, vhdSuffix) {
575+
if isDiskFsType(fsType) && !strings.HasSuffix(diskName, vhdSuffix) && req.GetVolumeContentSource() == nil {
549576
if accountKey == "" {
550577
if accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, req.GetSecrets(), secretName, secretNamespace); err != nil {
551578
return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
@@ -619,6 +646,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
619646
VolumeId: volumeID,
620647
CapacityBytes: capacityBytes,
621648
VolumeContext: parameters,
649+
ContentSource: req.GetVolumeContentSource(),
622650
},
623651
}, nil
624652
}
@@ -686,6 +714,18 @@ func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
686714
return &csi.DeleteVolumeResponse{}, nil
687715
}
688716

717+
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey string, shareOptions *fileclient.ShareOptions, storageEndpointSuffix string) error {
718+
vs := req.VolumeContentSource
719+
switch vs.Type.(type) {
720+
case *csi.VolumeContentSource_Snapshot:
721+
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
722+
case *csi.VolumeContentSource_Volume:
723+
return d.copyFileShare(ctx, req, accountKey, shareOptions, storageEndpointSuffix)
724+
default:
725+
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
726+
}
727+
}
728+
689729
// ControllerGetVolume get volume
690730
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
691731
return nil, status.Error(codes.Unimplemented, "")
@@ -1243,3 +1283,27 @@ func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
12431283
}
12441284
return nil
12451285
}
1286+
1287+
func generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
1288+
credential, err := service.NewSharedKeyCredential(accountName, accountKey)
1289+
if err != nil {
1290+
return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error()))
1291+
}
1292+
serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.file.%s/", accountName, storageEndpointSuffix), credential, nil)
1293+
if err != nil {
1294+
return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error()))
1295+
}
1296+
nowTime := time.Now()
1297+
sasURL, err := serviceClient.GetSASURL(
1298+
sas.AccountResourceTypes{Object: true, Service: true, Container: true},
1299+
sas.AccountPermissions{Read: true, List: true, Write: true},
1300+
time.Now().Add(time.Duration(expiryTime)*time.Minute), &service.GetSASURLOptions{StartTime: &nowTime})
1301+
if err != nil {
1302+
return "", err
1303+
}
1304+
u, err := url.Parse(sasURL)
1305+
if err != nil {
1306+
return "", err
1307+
}
1308+
return "?" + u.RawQuery, nil
1309+
}

0 commit comments

Comments
 (0)