Skip to content

Commit 58726b9

Browse files
tasherif-msftjhendrixMSFTealsurrichardpark-msftazure-sdk
authored
[AzDatalake] Lease Clients Implementation (#21297)
* Enable gocritic during linting (#20715) Enabled gocritic's evalOrder to catch dependencies on undefined behavior on return statements. Updated to latest version of golangci-lint. Fixed issue in azblob flagged by latest linter. * Cosmos DB: Enable merge support (#20716) * Adding header and value * Wiring and tests * format * Fixing value * change log * [azservicebus, azeventhubs] Stress test and logging improvement (#20710) Logging improvements: * Updating the logging to print more tracing information (per-link) in prep for the bigger release coming up. * Trimming out some of the verbose logging, seeing if I can get it a bit more reasonable. Stress tests: * Add a timestamp to the log name we generate and also default to append, not overwrite. * Use 0.5 cores, 0.5GB as our baseline. Some pods use more and I'll tune them more later. * update proxy version (#20712) Co-authored-by: Scott Beddall <[email protected]> * Return an error when you try to send a message that's too large. (#20721) This now works just like the message batch - you'll get an ErrMessageTooLarge if you attempt to send a message that's too large for the link's configured size. NOTE: there's a patch to `internal/go-amqp/Sender.go` to match what's in go-amqp's main so it returns a programmatically useful error when the message is too large. Fixes #20647 * Changes in test that is failing in pipeline (#20693) * [azservicebus, azeventhubs] Treat 'entity full' as a fatal error (#20722) When the remote entity is full we get a resource-limit-exceeded condition. This isn't something we should keep retrying on and it's best to just abort and let the user know immediately, rather than hoping it might eventually clear out. This affected both Event Hubs and Service Bus. Fixes #20647 * [azservicebus/azeventhubs] Redirect stderr and stdout to tee (#20726) * Update changelog with latest features (#20730) * Update changelog with latest features Prepare for upcoming release. * bump minor version * pass along the artifact name so we can override it later (#20732) Co-authored-by: scbedd <[email protected]> * [azeventhubs] Fixing checkpoint store race condition (#20727) The checkpoint store wasn't guarding against multiple owners claiming for the first time - fixing this by using IfNoneMatch Fixes #20717 * Fix azidentity troubleshooting guide link (#20736) * [Release] sdk/resourcemanager/paloaltonetworksngfw/armpanngfw/0.1.0 (#20437) * [Release] sdk/resourcemanager/paloaltonetworksngfw/armpanngfw/0.1.0 generation from spec commit: 85fb4ac6f8bfefd179e6c2632976a154b5c9ff04 * client factory * fix * fix * update * add sdk/resourcemanager/postgresql/armpostgresql live test (#20685) * add sdk/resourcemanager/postgresql/armpostgresql live test * update assets.json * set subscriptionId default value * format * add sdk/resourcemanager/eventhub/armeventhub live test (#20686) * add sdk/resourcemanager/eventhub/armeventhub live test * update assets * add sdk/resourcemanager/compute/armcompute live test (#20048) * add sdk/resourcemanager/compute/armcompute live test * skus filter * fix subscriptionId default value * fix * gofmt * update recording * sdk/resourcemanager/network/armnetwork live test (#20331) * sdk/resourcemanager/network/armnetwork live test * update subscriptionId default value * update recording * add sdk/resourcemanager/cosmos/armcosmos live test (#20705) * add sdk/resourcemanager/cosmos/armcosmos live test * update assets.json * update assets.json * update assets.json * update assets.json * Increment package version after release of azcore (#20740) * [azeventhubs] Improperly resetting etag in the checkpoint store (#20737) We shouldn't be resetting the etag to nil - it's what we use to enforce a "single winner" when doing ownership claims. The bug here was two-fold: I had bad logic in my previous claim ownership, which I fixed in a previous PR, but we need to reflect that same constraint properly in our in-memory checkpoint store for these tests. * Eng workflows sync and branch cleanup additions (#20743) Co-authored-by: James Suplizio <[email protected]> * [azeventhubs] Latest start position can also be inclusive (ie, get the latest message) (#20744) * Update GitHubEventProcessor version and remove pull_request_review procesing (#20751) Co-authored-by: James Suplizio <[email protected]> * Rename DisableAuthorityValidationAndInstanceDiscovery (#20746) * fix (#20707) * AzFile (#20739) * azfile: Fixing connection string parsing logic (#20798) * Fixing connection string parse logic * Update README * [azadmin] fix flaky test (#20758) * fix flaky test * charles suggestion * Prepare azidentity v1.3.0 for release (#20756) * Fix broken podman link (#20801) Co-authored-by: Wes Haggard <[email protected]> * [azquery] update doc comments (#20755) * update doc comments * update statistics and visualization generation * prep-for-release * Fixed contribution section (#20752) Co-authored-by: Bob Tabor <[email protected]> * [azeventhubs,azservicebus] Some API cleanup, renames (#20754) * Adding options to UpdateCheckpoint(), just for future potential expansion * Make Offset an int64, not a *int64 (it's not optional, it'll always come back with ReceivedEvents) * Adding more logging into the checkpoint store. * Point all imports at the production go-amqp * Add supporting features to enable distributed tracing (#20301) (#20708) * Add supporting features to enable distributed tracing This includes new internal pipeline policies and other supporting types. See the changelog for a full description. Added some missing doc comments. * fix linter issue * add net.peer.name trace attribute sequence custom HTTP header policy before logging policy. sequence logging policy after HTTP trace policy. keep body download policy at the end. * add span for iterating over pages * Restore ARM CAE support for azcore beta (#20657) This reverts commit 9020972. * Upgrade to stable azcore (#20808) * Increment package version after release of data/azcosmos (#20807) * Updating changelog (#20810) * Add fake package to azcore (#20711) * Add fake package to azcore This is the supporting infrastructure for the generated SDK fakes. * fix doc comment * Updating CHANGELOG.md (#20809) * changelog (#20811) * Increment package version after release of storage/azfile (#20813) * Update changelog (azblob) (#20815) * Updating CHANGELOG.md * Update the changelog with correct version * [azquery] migration guide (#20742) * migration guide * Charles feedback * Richard feedback --------- Co-authored-by: Charles Lowell <[email protected]> * Increment package version after release of monitor/azquery (#20820) * [keyvault] prep for release (#20819) * prep for release * perf tests * update date * lease implementation and tests * tests * handled errors * nit --------- Co-authored-by: Joel Hendrix <[email protected]> Co-authored-by: Matias Quaranta <[email protected]> Co-authored-by: Richard Park <[email protected]> Co-authored-by: Azure SDK Bot <[email protected]> Co-authored-by: Scott Beddall <[email protected]> Co-authored-by: siminsavani-msft <[email protected]> Co-authored-by: scbedd <[email protected]> Co-authored-by: Charles Lowell <[email protected]> Co-authored-by: Peng Jiahui <[email protected]> Co-authored-by: James Suplizio <[email protected]> Co-authored-by: Sourav Gupta <[email protected]> Co-authored-by: gracewilcox <[email protected]> Co-authored-by: Wes Haggard <[email protected]> Co-authored-by: Bob Tabor <[email protected]> Co-authored-by: Bob Tabor <[email protected]>
1 parent 757507a commit 58726b9

File tree

10 files changed

+689
-32
lines changed

10 files changed

+689
-32
lines changed

sdk/storage/azdatalake/assets.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"AssetsRepo": "Azure/azure-sdk-assets",
33
"AssetsRepoPrefixPath": "go",
44
"TagPrefix": "go/storage/azdatalake",
5-
"Tag": "go/storage/azdatalake_9dd1cc3e0e"
5+
"Tag": "go/storage/azdatalake_78f150eb1d"
66
}

sdk/storage/azdatalake/directory/client.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,14 @@ func (d *Client) renamePathInURL(newName string) (string, string, string) {
219219
lastIndex := strings.LastIndex(endpoint, separator)
220220
// Split the string based on the last occurrence of the separator
221221
firstPart := endpoint[:lastIndex] // From the beginning of the string to the last occurrence of the separator
222-
newPathURL, newBlobURL := shared.GetURLs(runtime.JoinPaths(firstPart, newName))
222+
newBlobURL, newPathURL := shared.GetURLs(runtime.JoinPaths(firstPart, newName))
223223
parsedNewURL, _ := url.Parse(d.DFSURL())
224224
return parsedNewURL.Path, newPathURL, newBlobURL
225225
}
226226

227227
// Rename renames a directory (dfs1)
228228
func (d *Client) Rename(ctx context.Context, newName string, options *RenameOptions) (RenameResponse, error) {
229-
newPathWithoutURL, newBlobURL, newPathURL := d.renamePathInURL(newName)
229+
newPathWithoutURL, newPathURL, newBlobURL := d.renamePathInURL(newName)
230230
lac, mac, smac, createOpts := path.FormatRenameOptions(options, newPathWithoutURL)
231231
var newBlobClient *blockblob.Client
232232
var err error

sdk/storage/azdatalake/file/client.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -224,14 +224,14 @@ func (f *Client) renamePathInURL(newName string) (string, string, string) {
224224
lastIndex := strings.LastIndex(endpoint, separator)
225225
// Split the string based on the last occurrence of the separator
226226
firstPart := endpoint[:lastIndex] // From the beginning of the string to the last occurrence of the separator
227-
newPathURL, newBlobURL := shared.GetURLs(runtime.JoinPaths(firstPart, newName))
227+
newBlobURL, newPathURL := shared.GetURLs(runtime.JoinPaths(firstPart, newName))
228228
parsedNewURL, _ := url.Parse(f.DFSURL())
229229
return parsedNewURL.Path, newPathURL, newBlobURL
230230
}
231231

232232
// Rename renames a file (dfs1)
233233
func (f *Client) Rename(ctx context.Context, newName string, options *RenameOptions) (RenameResponse, error) {
234-
newPathWithoutURL, newBlobURL, newPathURL := f.renamePathInURL(newName)
234+
newPathWithoutURL, newPathURL, newBlobURL := f.renamePathInURL(newName)
235235
lac, mac, smac, createOpts := path.FormatRenameOptions(options, newPathWithoutURL)
236236
var newBlobClient *blockblob.Client
237237
var err error

sdk/storage/azdatalake/filesystem/client.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -184,15 +184,15 @@ func (fs *Client) BlobURL() string {
184184
// The new directory.Client uses the same request policy pipeline as the Client.
185185
func (fs *Client) NewDirectoryClient(directoryPath string) *directory.Client {
186186
dirURL := runtime.JoinPaths(fs.generatedFSClientWithDFS().Endpoint(), directoryPath)
187-
dirURL, blobURL := shared.GetURLs(dirURL)
187+
blobURL, dirURL := shared.GetURLs(dirURL)
188188
return (*directory.Client)(base.NewPathClient(dirURL, blobURL, fs.containerClient().NewBlockBlobClient(directoryPath), fs.generatedFSClientWithDFS().InternalClient().WithClientName(shared.DirectoryClient), fs.sharedKey(), fs.identityCredential(), fs.getClientOptions()))
189189
}
190190

191191
// NewFileClient creates a new file.Client object by concatenating file path to the end of this Client's URL.
192192
// The new file.Client uses the same request policy pipeline as the Client.
193193
func (fs *Client) NewFileClient(filePath string) *file.Client {
194194
fileURL := runtime.JoinPaths(fs.generatedFSClientWithDFS().Endpoint(), filePath)
195-
fileURL, blobURL := shared.GetURLs(filePath)
195+
blobURL, fileURL := shared.GetURLs(fileURL)
196196
return (*file.Client)(base.NewPathClient(fileURL, blobURL, fs.containerClient().NewBlockBlobClient(filePath), fs.generatedFSClientWithDFS().InternalClient().WithClientName(shared.FileClient), fs.sharedKey(), fs.identityCredential(), fs.getClientOptions()))
197197
}
198198

sdk/storage/azdatalake/internal/testcommon/clients_auth.go

+14
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,20 @@ func GetFileClient(fsName, fName string, t *testing.T, accountType TestAccountTy
141141
return fileClient, err
142142
}
143143

144+
func CreateNewFile(ctx context.Context, _require *require.Assertions, fileName string, filesystemClient *filesystem.Client) *file.Client {
145+
fileClient := filesystemClient.NewFileClient(fileName)
146+
_, err := fileClient.Create(ctx, nil)
147+
_require.Nil(err)
148+
return fileClient
149+
}
150+
151+
func CreateNewDir(ctx context.Context, _require *require.Assertions, dirName string, filesystemClient *filesystem.Client) *directory.Client {
152+
dirClient := filesystemClient.NewDirectoryClient(dirName)
153+
_, err := dirClient.Create(ctx, nil)
154+
_require.Nil(err)
155+
return dirClient
156+
}
157+
144158
func GetDirClient(fsName, dirName string, t *testing.T, accountType TestAccountType, options *directory.ClientOptions) (*directory.Client, error) {
145159
if options == nil {
146160
options = &directory.ClientOptions{}

sdk/storage/azdatalake/lease/filesystem_client.go renamed to sdk/storage/azdatalake/lease/client.go

+29-12
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,36 @@ package lease
88

99
import (
1010
"context"
11+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
1112
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/lease"
1213
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/filesystem"
14+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/internal/base"
15+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/internal/exported"
16+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/internal/generated"
1317
)
1418

1519
// FilesystemClient provides lease functionality for the underlying filesystem client.
1620
type FilesystemClient struct {
17-
leaseID *string
1821
containerClient *lease.ContainerClient
22+
leaseID *string
1923
}
2024

2125
// FilesystemClientOptions contains the optional values when creating a FilesystemClient.
22-
type FilesystemClientOptions struct {
23-
// LeaseID contains a caller-provided lease ID.
24-
LeaseID *string
25-
}
26+
type FilesystemClientOptions = lease.ContainerClientOptions
2627

2728
// NewFilesystemClient creates a filesystem lease client for the provided filesystem client.
2829
// - client - an instance of a filesystem client
2930
// - options - client options; pass nil to accept the default values
3031
func NewFilesystemClient(client *filesystem.Client, options *FilesystemClientOptions) (*FilesystemClient, error) {
31-
// TODO: set up container lease client
32-
return nil, nil
32+
_, _, containerClient := base.InnerClients((*base.CompositeClient[generated.FileSystemClient, generated.FileSystemClient, container.Client])(client))
33+
containerLeaseClient, err := lease.NewContainerClient(containerClient, options)
34+
if err != nil {
35+
return nil, exported.ConvertToDFSError(err)
36+
}
37+
return &FilesystemClient{
38+
containerClient: containerLeaseClient,
39+
leaseID: containerLeaseClient.LeaseID(),
40+
}, nil
3341
}
3442

3543
// LeaseID returns leaseID of the client.
@@ -42,33 +50,42 @@ func (c *FilesystemClient) LeaseID() *string {
4250
// For more information, see https://docs.microsoft.com/rest/api/storageservices/lease-blob.
4351
func (c *FilesystemClient) AcquireLease(ctx context.Context, duration int32, o *FilesystemAcquireOptions) (FilesystemAcquireResponse, error) {
4452
opts := o.format()
45-
return c.containerClient.AcquireLease(ctx, duration, opts)
53+
resp, err := c.containerClient.AcquireLease(ctx, duration, opts)
54+
return resp, exported.ConvertToDFSError(err)
4655
}
4756

4857
// BreakLease breaks the filesystem's previously-acquired lease (if it exists). Pass the LeaseBreakDefault (-1)
4958
// constant to break a fixed-Duration lease when it expires or an infinite lease immediately.
5059
// For more information, see https://docs.microsoft.com/rest/api/storageservices/lease-blob.
5160
func (c *FilesystemClient) BreakLease(ctx context.Context, o *FilesystemBreakOptions) (FilesystemBreakResponse, error) {
5261
opts := o.format()
53-
return c.containerClient.BreakLease(ctx, opts)
62+
resp, err := c.containerClient.BreakLease(ctx, opts)
63+
return resp, exported.ConvertToDFSError(err)
5464
}
5565

5666
// ChangeLease changes the filesystem's lease ID.
5767
// For more information, see https://docs.microsoft.com/rest/api/storageservices/lease-blob.
5868
func (c *FilesystemClient) ChangeLease(ctx context.Context, proposedLeaseID string, o *FilesystemChangeOptions) (FilesystemChangeResponse, error) {
5969
opts := o.format()
60-
return c.containerClient.ChangeLease(ctx, proposedLeaseID, opts)
70+
resp, err := c.containerClient.ChangeLease(ctx, proposedLeaseID, opts)
71+
if err != nil {
72+
return resp, exported.ConvertToDFSError(err)
73+
}
74+
c.leaseID = &proposedLeaseID
75+
return resp, nil
6176
}
6277

6378
// RenewLease renews the filesystem's previously-acquired lease.
6479
// For more information, see https://docs.microsoft.com/rest/api/storageservices/lease-blob.
6580
func (c *FilesystemClient) RenewLease(ctx context.Context, o *FilesystemRenewOptions) (FilesystemRenewResponse, error) {
6681
opts := o.format()
67-
return c.containerClient.RenewLease(ctx, opts)
82+
resp, err := c.containerClient.RenewLease(ctx, opts)
83+
return resp, exported.ConvertToDFSError(err)
6884
}
6985

7086
// ReleaseLease releases the filesystem's previously-acquired lease.
7187
func (c *FilesystemClient) ReleaseLease(ctx context.Context, o *FilesystemReleaseOptions) (FilesystemReleaseResponse, error) {
7288
opts := o.format()
73-
return c.containerClient.ReleaseLease(ctx, opts)
89+
resp, err := c.containerClient.ReleaseLease(ctx, opts)
90+
return resp, exported.ConvertToDFSError(err)
7491
}

0 commit comments

Comments
 (0)