Skip to content

feat: support volume cloning #1278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions deploy/example/pvc-azurefile-csi-clone.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: pvc-azurefile-clone
namespace: default
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
storageClassName: azurefile-csi
dataSource:
kind: PersistentVolumeClaim
name: pvc-azurefile
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.6.1
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v0.1.0
github.com/jongio/azidext/go/azidext v0.5.0
github.com/onsi/ginkgo/v2 v2.11.0
golang.org/x/sys v0.12.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 h1:vcYCAze6p19qBW7MhZybI
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0/go.mod h1:OQeznEEkTZ9OrhHJoDD8ZDq51FHgXjqtP9z6bEwBq9U=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 h1:sXr+ck84g/ZlZUOZiNELInmMgOsuGwdjjVkEIde0OtY=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aovMlrjvvXoPMBVSPzk9185BT0+eZM=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v0.1.0 h1:HIMTmT8prBdCqb2F7NU6ptqF8sy8TASYfDmzDziblJA=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v0.1.0/go.mod h1:AjDdvSU6d92BGS2JfdsKi+H/c2vQY3OFp4qhxzsUH8g=
github.com/Azure/azure-storage-file-go v0.8.0 h1:OX8DGsleWLUE6Mw4R/OeWEZMvsTIpwN94J59zqKQnTI=
github.com/Azure/azure-storage-file-go v0.8.0/go.mod h1:3w3mufGcMjcOJ3w+4Gs+5wsSgkT7xDwWWqMMIrXtW4c=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
Expand Down
70 changes: 70 additions & 0 deletions pkg/azurefile/azurefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/binary"
"fmt"
"net/url"
"os/exec"
"strconv"
"strings"
"sync"
Expand All @@ -32,6 +33,8 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/pborman/uuid"
"github.com/rubiojr/go-vhd/vhd"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -174,6 +177,9 @@ const (
SnapshotID = "snapshot_id"

FSGroupChangeNone = "None"

waitForCopyInterval = 5 * time.Second
waitForCopyTimeout = 3 * time.Minute
)

var (
Expand Down Expand Up @@ -209,6 +215,7 @@ type DriverOptions struct {
SkipMatchingTagCacheExpireInMinutes int
VolStatsCacheExpireInMinutes int
PrintVolumeStatsCallLogs bool
SasTokenExpirationMinutes int
}

// Driver implements all interfaces of CSI drivers
Expand Down Expand Up @@ -260,6 +267,8 @@ type Driver struct {
resizeFileShareFailureCache azcache.Resource
// a timed cache storing volume stats <volumeID, volumeStats>
volStatsCache azcache.Resource
// sas expiry time for azcopy in volume clone
sasTokenExpirationMinutes int
}

// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
Expand Down Expand Up @@ -287,6 +296,7 @@ func NewDriver(options *DriverOptions) *Driver {
driver.appendClosetimeoOption = options.AppendClosetimeoOption
driver.appendNoShareSockOption = options.AppendNoShareSockOption
driver.printVolumeStatsCallLogs = options.PrintVolumeStatsCallLogs
driver.sasTokenExpirationMinutes = options.SasTokenExpirationMinutes
driver.volLockMap = newLockMap()
driver.subnetLockMap = newLockMap()
driver.volumeLocks = newVolumeLocks()
Expand Down Expand Up @@ -363,6 +373,7 @@ func (d *Driver) Run(endpoint, kubeconfig string, testBool bool) {
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
})
d.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
Expand Down Expand Up @@ -889,6 +900,65 @@ func (d *Driver) ResizeFileShare(ctx context.Context, subsID, resourceGroup, acc
})
}

// CopyFileShare copies a fileshare in the same storage account
func (d *Driver) copyFileShare(ctx context.Context, req *csi.CreateVolumeRequest, accountKey string, shareOptions *fileclient.ShareOptions, storageEndpointSuffix string) error {
if shareOptions.Protocol == storage.EnabledProtocolsNFS {
return fmt.Errorf("protocol nfs is not supported for volume cloning")
}
var sourceVolumeID string
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
}
resourceGroupName, accountName, srcFileShareName, _, _, _, err := GetFileShareInfo(sourceVolumeID) //nolint:dogsled
if err != nil {
return status.Error(codes.NotFound, err.Error())
}
dstFileShareName := shareOptions.Name
if srcFileShareName == "" || dstFileShareName == "" {
return fmt.Errorf("srcFileShareName(%s) or dstFileShareName(%s) is empty", srcFileShareName, dstFileShareName)
}

klog.V(2).Infof("generate sas token for account(%s)", accountName)
accountSasToken, genErr := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
if genErr != nil {
return genErr
}

timeAfter := time.After(waitForCopyTimeout)
timeTick := time.Tick(waitForCopyInterval)
srcPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, srcFileShareName, accountSasToken)
dstPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, dstFileShareName, accountSasToken)

jobState, percent, err := getAzcopyJob(dstFileShareName)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
if jobState == AzcopyJobError || jobState == AzcopyJobCompleted {
return err
}
klog.V(2).Infof("begin to copy fileshare %s to %s", srcFileShareName, dstFileShareName)
for {
select {
case <-timeTick:
jobState, percent, err := getAzcopyJob(dstFileShareName)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
switch jobState {
case AzcopyJobError, AzcopyJobCompleted:
return err
case AzcopyJobNotFound:
klog.V(2).Infof("copy fileshare %s to %s", srcFileShareName, dstFileShareName)
out, copyErr := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
if copyErr != nil {
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstFileShareName, copyErr, string(out))
} else {
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
}
return copyErr
}
case <-timeAfter:
return fmt.Errorf("timeout waiting for copy fileshare %s to %s succeed", srcFileShareName, dstFileShareName)
}
}
}

// GetTotalAccountQuota returns the total quota in GB of all file shares in the storage account and the number of file shares
func (d *Driver) GetTotalAccountQuota(ctx context.Context, subsID, resourceGroup, accountName string) (int32, int32, error) {
fileshares, err := d.cloud.FileClient.WithSubscriptionID(subsID).ListFileShare(ctx, resourceGroup, accountName, "", "")
Expand Down
68 changes: 66 additions & 2 deletions pkg/azurefile/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

volumehelper "sigs.k8s.io/azurefile-csi-driver/pkg/util"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/service"
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
"github.com/Azure/azure-storage-file-go/azfile"
"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -104,6 +106,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}

if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
// logging the job status if it's volume cloning
if req.GetVolumeContentSource() != nil {
jobState, percent, err := getAzcopyJob(volName)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
}
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
}
defer d.volumeLocks.Release(volName)
Expand Down Expand Up @@ -516,7 +523,16 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}

var volumeID string
mc := metrics.NewMetricContext(azureFileCSIDriverName, "controller_create_volume", d.cloud.ResourceGroup, subsID, d.Name)
requestName := "controller_create_volume"
if req.GetVolumeContentSource() != nil {
switch req.VolumeContentSource.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
requestName = "controller_create_volume_from_snapshot"
case *csi.VolumeContentSource_Volume:
requestName = "controller_create_volume_from_volume"
}
}
mc := metrics.NewMetricContext(azureFileCSIDriverName, requestName, d.cloud.ResourceGroup, subsID, d.Name)
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
Expand All @@ -543,9 +559,20 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}
return nil, status.Errorf(codes.Internal, "failed to create file share(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d), error: %v", validFileShareName, account, sku, subsID, resourceGroup, location, fileShareSize, err)
}
if req.GetVolumeContentSource() != nil {
accountKeyCopy, err := d.GetStorageAccesskey(ctx, accountOptions, req.GetSecrets(), secretName, secretNamespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
if err := d.copyVolume(ctx, req, accountKeyCopy, shareOptions, storageEndpointSuffix); err != nil {
return nil, err
}
// storeAccountKey is not needed here since copy volume is only using SAS token
storeAccountKey = false
}
klog.V(2).Infof("create file share %s on storage account %s successfully", validFileShareName, accountName)

if isDiskFsType(fsType) && !strings.HasSuffix(diskName, vhdSuffix) {
if isDiskFsType(fsType) && !strings.HasSuffix(diskName, vhdSuffix) && req.GetVolumeContentSource() == nil {
if accountKey == "" {
if accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, req.GetSecrets(), secretName, secretNamespace); err != nil {
return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
Expand Down Expand Up @@ -619,6 +646,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
VolumeId: volumeID,
CapacityBytes: capacityBytes,
VolumeContext: parameters,
ContentSource: req.GetVolumeContentSource(),
},
}, nil
}
Expand Down Expand Up @@ -686,6 +714,18 @@ func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
return &csi.DeleteVolumeResponse{}, nil
}

func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey string, shareOptions *fileclient.ShareOptions, storageEndpointSuffix string) error {
vs := req.VolumeContentSource
switch vs.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
case *csi.VolumeContentSource_Volume:
return d.copyFileShare(ctx, req, accountKey, shareOptions, storageEndpointSuffix)
default:
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
}
}

// ControllerGetVolume get volume
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
Expand Down Expand Up @@ -1243,3 +1283,27 @@ func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
}
return nil
}

func generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
credential, err := service.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error()))
}
serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.file.%s/", accountName, storageEndpointSuffix), credential, nil)
if err != nil {
return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error()))
}
nowTime := time.Now()
sasURL, err := serviceClient.GetSASURL(
sas.AccountResourceTypes{Object: true, Service: true, Container: true},
sas.AccountPermissions{Read: true, List: true, Write: true},
time.Now().Add(time.Duration(expiryTime)*time.Minute), &service.GetSASURLOptions{StartTime: &nowTime})
if err != nil {
return "", err
}
u, err := url.Parse(sasURL)
if err != nil {
return "", err
}
return "?" + u.RawQuery, nil
}
Loading