Skip to content

azblob: Return io.ErrUnexpectedEOF as error in UploadStream #22109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/storage/azblob/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Fixed an issue that would cause metadata keys with empty values to be omitted when enumerating blobs.
* Fixed an issue where passing empty map to set blob tags API was causing panic. Fixes [#21869](https://github.com/Azure/azure-sdk-for-go/issues/21869).
* Fixed an issue where downloaded file has incorrect size when not a multiple of block size. Fixes [#21995](https://github.com/Azure/azure-sdk-for-go/issues/21995).
* Fixed case where `io.ErrUnexpectedEOF` was treated as expected error in `UploadStream`. Fixes [#21837](https://github.com/Azure/azure-sdk-for-go/issues/21837).

### Other Changes

Expand Down
4 changes: 2 additions & 2 deletions sdk/storage/azblob/blockblob/chunkwriting.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst blockWrit
}

var n int
n, err = io.ReadFull(src, buffer)
n, err = shared.ReadAtLeast(src, buffer, len(buffer))

if n > 0 {
// some data was read, upload it
Expand Down Expand Up @@ -108,7 +108,7 @@ func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst blockWrit
}

if err != nil { // The reader is done, no more outgoing buffers
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
if errors.Is(err, io.EOF) {
// these are expected errors, we don't surface those
err = nil
} else {
Expand Down
128 changes: 128 additions & 0 deletions sdk/storage/azblob/blockblob/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5033,6 +5033,134 @@ func (s *BlockBlobUnrecordedTestsSuite) TestUploadStreamToBlobProperties() {
_require.EqualValues(actualBlobData, blobData)
}

func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadDownloadStream() {
_require := require.New(s.T())
testName := s.T().Name()
svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil)
_require.NoError(err)

blobSize := 11 * 1024 * 1024
bufferSize := 2 * 1024 * 1024
maxBuffers := 2

containerName := testcommon.GenerateContainerName(testName)
containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient)
defer testcommon.DeleteContainer(context.Background(), _require, containerClient)

// Set up test blob
blobName := testcommon.GenerateBlobName(testName)
bbClient := testcommon.GetBlockBlobClient(blobName, containerClient)
blobContentReader, blobData := testcommon.GenerateData(blobSize)

_, err = bbClient.UploadStream(context.Background(), blobContentReader, &blockblob.UploadStreamOptions{
BlockSize: int64(bufferSize),
Concurrency: maxBuffers,
Metadata: testcommon.BasicMetadata,
Tags: testcommon.BasicBlobTagsMap,
HTTPHeaders: &testcommon.BasicHeaders,
})
_require.NoError(err)

downloadResponse, err := bbClient.DownloadStream(context.Background(), nil)
_require.NoError(err)

bbClient2 := testcommon.GetBlockBlobClient("blobName2", containerClient)

// UploadStream using http.Response.Body as the reader
_, err = bbClient2.UploadStream(context.Background(), downloadResponse.Body, &blockblob.UploadStreamOptions{
BlockSize: int64(bufferSize),
Concurrency: maxBuffers,
})
_require.NoError(err)

downloadResp2, err := bbClient2.DownloadStream(context.Background(), nil)
_require.NoError(err)

// Assert that the content is correct
actualBlobData, err := io.ReadAll(downloadResp2.Body)
_require.NoError(err)
_require.Equal(len(actualBlobData), len(blobData))
_require.EqualValues(actualBlobData, blobData)
}

// This test simulates UploadStream and DownloadBuffer methods,
// and verifies length and content of file
func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadStreamDownloadBuffer() {
_require := require.New(s.T())
testName := s.T().Name()
svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil)
_require.NoError(err)

containerName := testcommon.GenerateContainerName(testName)
containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient)
defer testcommon.DeleteContainer(context.Background(), _require, containerClient)

const MiB = 1024 * 1024
testUploadDownload := func(contentSize int) {
content := make([]byte, contentSize)
_, _ = rand.Read(content)
contentMD5 := md5.Sum(content)
body := streaming.NopCloser(bytes.NewReader(content))

srcBlob := containerClient.NewBlockBlobClient("srcblob")

// Prepare source bbClient for copy.
_, err = srcBlob.UploadStream(context.Background(), body, &blockblob.UploadStreamOptions{
BlockSize: 4 * MiB,
Concurrency: 5,
})
_require.NoError(err)

// Download to a buffer and verify contents
buff := make([]byte, contentSize)
b := blob.DownloadBufferOptions{
BlockSize: 5 * MiB,
Concurrency: 4,
}
n, err := srcBlob.DownloadBuffer(context.Background(), buff, &b)
_require.NoError(err)
_require.Equal(int64(contentSize), n)
_require.Equal(contentMD5, md5.Sum(buff[:]))
}

testUploadDownload(0) // zero byte blob
testUploadDownload(5 * MiB)
testUploadDownload(20 * MiB)
testUploadDownload(199 * MiB)
}

type fakeReader struct {
cnt int
}

func (a *fakeReader) Read(bytes []byte) (count int, err error) {
if a.cnt < 5 {
_, buf := testcommon.GenerateData(1024)
n := copy(bytes, buf)
a.cnt++
return n, nil
}
return 0, io.ErrUnexpectedEOF
}

func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadStreamUsingCustomReader() {
_require := require.New(s.T())
testName := s.T().Name()
svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil)
_require.NoError(err)

containerName := testcommon.GenerateContainerName(testName)
containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient)
defer testcommon.DeleteContainer(context.Background(), _require, containerClient)

bbClient := testcommon.GetBlockBlobClient(testcommon.GenerateBlobName(testName), containerClient)

r := &fakeReader{}
_, err = bbClient.UploadStream(context.Background(), r, nil)
_require.Error(err)
_require.Equal(err, io.ErrUnexpectedEOF)
}

func (s *BlockBlobRecordedTestsSuite) TestBlockBlobSetTierOnVersions() {
_require := require.New(s.T())
testName := s.T().Name()
Expand Down
24 changes: 24 additions & 0 deletions sdk/storage/azblob/internal/shared/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,27 @@ func IsIPEndpointStyle(host string) bool {
}
return net.ParseIP(host) != nil
}

// ReadAtLeast reads from r into buf until it has read at least min bytes.
// It returns the number of bytes copied and an error.
// The EOF error is returned if no bytes were read or
// EOF happened after reading fewer than min bytes.
// If min is greater than the length of buf, ReadAtLeast returns ErrShortBuffer.
// On return, n >= min if and only if err == nil.
// If r returns an error having read at least min bytes, the error is dropped.
// This method is same as io.ReadAtLeast except that it does not
// return io.ErrUnexpectedEOF when fewer than min bytes are read.
func ReadAtLeast(r io.Reader, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, io.ErrShortBuffer
}
for n < min && err == nil {
var nn int
nn, err = r.Read(buf[n:])
n += nn
}
if n >= min {
err = nil
}
return
}