Skip to content

Commit fb7ebad

Browse files
authored
Merge pull request #249 from Azure/dev
Release v0.13
2 parents 6df5d9a + e76e24c commit fb7ebad

11 files changed

+381
-96
lines changed

ChangeLog.md

+4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
> See [BreakingChanges](BreakingChanges.md) for a detailed list of API breaks.
44
5+
## Version 0.13.0:
6+
- Validate echoed client request ID from the service
7+
- Added new TransferManager option for UploadStreamToBlockBlob to fine-tune the concurrency and memory usage
8+
59
## Version 0.12.0:
610
- Added support for [Customer Provided Key](https://docs.microsoft.com/en-us/azure/storage/common/storage-service-encryption) which will let users encrypt their data within client applications before uploading to Azure Storage, and decrypting data while downloading to the client
711
- Read here to know more about [Azure key vault](https://docs.microsoft.com/en-us/azure/key-vault/general/overview), [Encryption scope](https://docs.microsoft.com/en-us/azure/storage/blobs/encryption-scope-manage?tabs=portal), [managing encryption scope](https://docs.microsoft.com/en-us/azure/storage/blobs/encryption-scope-manage?tabs=portal), and how to [configure customer managed keys](https://docs.microsoft.com/en-us/azure/data-explorer/customer-managed-keys-portal)

azblob/bytes_writer.go

+24-24
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
1-
package azblob
2-
3-
import (
4-
"errors"
5-
)
6-
7-
type bytesWriter []byte
8-
9-
func newBytesWriter(b []byte) bytesWriter {
10-
return b
11-
}
12-
13-
func (c bytesWriter) WriteAt(b []byte, off int64) (int, error) {
14-
if off >= int64(len(c)) || off < 0 {
15-
return 0, errors.New("Offset value is out of range")
16-
}
17-
18-
n := copy(c[int(off):], b)
19-
if n < len(b) {
20-
return n, errors.New("Not enough space for all bytes")
21-
}
22-
23-
return n, nil
24-
}
1+
package azblob
2+
3+
import (
4+
"errors"
5+
)
6+
7+
type bytesWriter []byte
8+
9+
func newBytesWriter(b []byte) bytesWriter {
10+
return b
11+
}
12+
13+
func (c bytesWriter) WriteAt(b []byte, off int64) (int, error) {
14+
if off >= int64(len(c)) || off < 0 {
15+
return 0, errors.New("Offset value is out of range")
16+
}
17+
18+
n := copy(c[int(off):], b)
19+
if n < len(b) {
20+
return n, errors.New("Not enough space for all bytes")
21+
}
22+
23+
return n, nil
24+
}

azblob/chunkwriting.go

+40-58
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"io"
1111
"sync"
12+
"sync/atomic"
1213

1314
guuid "github.com/google/uuid"
1415
)
@@ -29,7 +30,9 @@ type blockWriter interface {
2930
// choose a max value for the memory setting based on internal transfers within Azure (which will give us the maximum throughput model).
3031
// We can even provide a utility to dial this number in for customer networks to optimize their copies.
3132
func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o UploadStreamToBlockBlobOptions) (*BlockBlobCommitBlockListResponse, error) {
32-
o.defaults()
33+
if err := o.defaults(); err != nil {
34+
return nil, err
35+
}
3336

3437
ctx, cancel := context.WithCancel(ctx)
3538
defer cancel()
@@ -41,19 +44,7 @@ func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o Uploa
4144
to: to,
4245
id: newID(),
4346
o: o,
44-
ch: make(chan copierChunk, 1),
4547
errCh: make(chan error, 1),
46-
buffers: sync.Pool{
47-
New: func() interface{} {
48-
return make([]byte, o.BufferSize)
49-
},
50-
},
51-
}
52-
53-
// Starts the pools of concurrent writers.
54-
cp.wg.Add(o.MaxBuffers)
55-
for i := 0; i < o.MaxBuffers; i++ {
56-
go cp.writer()
5748
}
5849

5950
// Send all our chunks until we get an error.
@@ -84,24 +75,21 @@ type copier struct {
8475
ctx context.Context
8576
cancel context.CancelFunc
8677

78+
// o contains our options for uploading.
79+
o UploadStreamToBlockBlobOptions
80+
81+
// id provides the ids for each chunk.
82+
id *id
83+
8784
// reader is the source to be written to storage.
8885
reader io.Reader
8986
// to is the location we are writing our chunks to.
9087
to blockWriter
9188

92-
id *id
93-
o UploadStreamToBlockBlobOptions
94-
95-
// num is the current chunk we are on.
96-
num int32
97-
// ch is used to pass the next chunk of data from our reader to one of the writers.
98-
ch chan copierChunk
9989
// errCh is used to hold the first error from our concurrent writers.
10090
errCh chan error
10191
// wg provides a count of how many writers we are waiting to finish.
10292
wg sync.WaitGroup
103-
// buffers provides a pool of chunks that can be reused.
104-
buffers sync.Pool
10593

10694
// result holds the final result from blob storage after we have submitted all chunks.
10795
result *BlockBlobCommitBlockListResponse
@@ -130,26 +118,38 @@ func (c *copier) sendChunk() error {
130118
return err
131119
}
132120

133-
buffer := c.buffers.Get().([]byte)
121+
buffer := c.o.TransferManager.Get()
122+
if len(buffer) == 0 {
123+
return fmt.Errorf("TransferManager returned a 0 size buffer, this is a bug in the manager")
124+
}
125+
134126
n, err := io.ReadFull(c.reader, buffer)
135127
switch {
136128
case err == nil && n == 0:
137129
return nil
138130
case err == nil:
139-
c.ch <- copierChunk{
140-
buffer: buffer[0:n],
141-
id: c.id.next(),
142-
}
131+
id := c.id.next()
132+
c.wg.Add(1)
133+
c.o.TransferManager.Run(
134+
func() {
135+
defer c.wg.Done()
136+
c.write(copierChunk{buffer: buffer[0:n], id: id})
137+
},
138+
)
143139
return nil
144140
case err != nil && (err == io.EOF || err == io.ErrUnexpectedEOF) && n == 0:
145141
return io.EOF
146142
}
147143

148144
if err == io.EOF || err == io.ErrUnexpectedEOF {
149-
c.ch <- copierChunk{
150-
buffer: buffer[0:n],
151-
id: c.id.next(),
152-
}
145+
id := c.id.next()
146+
c.wg.Add(1)
147+
c.o.TransferManager.Run(
148+
func() {
149+
defer c.wg.Done()
150+
c.write(copierChunk{buffer: buffer[0:n], id: id})
151+
},
152+
)
153153
return io.EOF
154154
}
155155
if err := c.getErr(); err != nil {
@@ -158,41 +158,23 @@ func (c *copier) sendChunk() error {
158158
return err
159159
}
160160

161-
// writer writes chunks sent on a channel.
162-
func (c *copier) writer() {
163-
defer c.wg.Done()
164-
165-
for chunk := range c.ch {
166-
if err := c.write(chunk); err != nil {
167-
if !errors.Is(err, context.Canceled) {
168-
select {
169-
case c.errCh <- err:
170-
c.cancel()
171-
default:
172-
}
173-
return
174-
}
175-
}
176-
}
177-
}
178-
179161
// write uploads a chunk to blob storage.
180-
func (c *copier) write(chunk copierChunk) error {
181-
defer c.buffers.Put(chunk.buffer)
162+
func (c *copier) write(chunk copierChunk) {
163+
defer c.o.TransferManager.Put(chunk.buffer)
182164

183165
if err := c.ctx.Err(); err != nil {
184-
return err
166+
return
185167
}
186168
_, err := c.to.StageBlock(c.ctx, chunk.id, bytes.NewReader(chunk.buffer), c.o.AccessConditions.LeaseAccessConditions, nil, c.o.ClientProvidedKeyOptions)
187169
if err != nil {
188-
return fmt.Errorf("write error: %w", err)
170+
c.errCh <- fmt.Errorf("write error: %w", err)
171+
return
189172
}
190-
return nil
173+
return
191174
}
192175

193176
// close commits our blocks to blob storage and closes our writer.
194177
func (c *copier) close() error {
195-
close(c.ch)
196178
c.wg.Wait()
197179

198180
if err := c.getErr(); err != nil {
@@ -219,11 +201,11 @@ func newID() *id {
219201
return &id{u: u}
220202
}
221203

222-
// next returns the next ID. This is not thread-safe.
204+
// next returns the next ID.
223205
func (id *id) next() string {
224-
defer func() { id.num++ }()
206+
defer atomic.AddUint32(&id.num, 1)
225207

226-
binary.BigEndian.PutUint32((id.u[len(guuid.UUID{}):]), id.num)
208+
binary.BigEndian.PutUint32((id.u[len(guuid.UUID{}):]), atomic.LoadUint32(&id.num))
227209
str := base64.StdEncoding.EncodeToString(id.u[:])
228210
id.all = append(id.all, str)
229211

azblob/chunkwriting_test.go

+24-1
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,17 @@ func TestGetErr(t *testing.T) {
141141
{"Err returned", context.Background(), err, err},
142142
}
143143

144+
tm, err := NewStaticBuffer(_1MiB, 1)
145+
if err != nil {
146+
panic(err)
147+
}
148+
144149
for _, test := range tests {
145-
c := copier{errCh: make(chan error, 1), ctx: test.ctx}
150+
c := copier{
151+
errCh: make(chan error, 1),
152+
ctx: test.ctx,
153+
o: UploadStreamToBlockBlobOptions{TransferManager: tm},
154+
}
146155
if test.err != nil {
147156
c.errCh <- test.err
148157
}
@@ -160,6 +169,12 @@ func TestCopyFromReader(t *testing.T) {
160169
canceled, cancel := context.WithCancel(context.Background())
161170
cancel()
162171

172+
spm, err := NewSyncPool(_1MiB, 2)
173+
if err != nil {
174+
panic(err)
175+
}
176+
defer spm.Close()
177+
163178
tests := []struct {
164179
desc string
165180
ctx context.Context
@@ -231,6 +246,14 @@ func TestCopyFromReader(t *testing.T) {
231246
fileSize: 12 * _1MiB,
232247
o: UploadStreamToBlockBlobOptions{MaxBuffers: 5, BufferSize: 8 * 1024 * 1024},
233248
},
249+
{
250+
desc: "Send file(12 MiB) with default UploadStreamToBlockBlobOptions using SyncPool manager",
251+
ctx: context.Background(),
252+
fileSize: 12 * _1MiB,
253+
o: UploadStreamToBlockBlobOptions{
254+
TransferManager: spm,
255+
},
256+
},
234257
}
235258

236259
for _, test := range tests {

0 commit comments

Comments
 (0)