Skip to content

Commit c366c90

Browse files
authored
feat(storage/transfermanager): checksum full object downloads (#10569)
1 parent 123c886 commit c366c90

File tree

3 files changed

+267
-71
lines changed

3 files changed

+267
-71
lines changed

storage/transfermanager/downloader.go

+217-64
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
"hash"
22+
"hash/crc32"
2123
"io"
2224
"io/fs"
2325
"math"
@@ -31,6 +33,12 @@ import (
3133
"google.golang.org/api/iterator"
3234
)
3335

36+
// maxChecksumZeroArraySize is the maximum amount of memory to allocate for
37+
// updating the checksum. A larger size will occupy more memory but will require
38+
// fewer updates when computing the crc32c of a full object.
39+
// TODO: test the performance of smaller values for this.
40+
const maxChecksumZeroArraySize = 4 * 1024 * 1024
41+
3442
// Downloader manages a set of parallelized downloads.
3543
type Downloader struct {
3644
client *storage.Client
@@ -288,7 +296,7 @@ func (d *Downloader) addNewInputs(inputs []DownloadObjectInput) {
288296
}
289297

290298
func (d *Downloader) addResult(input *DownloadObjectInput, result *DownloadOutput) {
291-
copiedResult := *result // make a copy so that callbacks do not affect the result
299+
copiedResult := *result // make a copy so that callbacks do not affect the result
292300

293301
if input.directory {
294302
f := input.Destination.(*os.File)
@@ -305,7 +313,6 @@ func (d *Downloader) addResult(input *DownloadObjectInput, result *DownloadOutpu
305313
input.directoryObjectOutputs <- copiedResult
306314
}
307315
}
308-
// TODO: check checksum if full object
309316

310317
if d.config.asynchronous || input.directory {
311318
input.Callback(result)
@@ -337,27 +344,10 @@ func (d *Downloader) downloadWorker() {
337344
break // no more work; exit
338345
}
339346

340-
out := input.downloadShard(d.client, d.config.perOperationTimeout, d.config.partSize)
341-
342347
if input.shard == 0 {
343-
if out.Err != nil {
344-
// Don't queue more shards if the first failed.
345-
d.addResult(input, out)
346-
} else {
347-
numShards := numShards(out.Attrs, input.Range, d.config.partSize)
348-
349-
if numShards <= 1 {
350-
// Download completed with a single shard.
351-
d.addResult(input, out)
352-
} else {
353-
// Queue more shards.
354-
outs := d.queueShards(input, out.Attrs.Generation, numShards)
355-
// Start a goroutine that gathers shards sent to the output
356-
// channel and adds the result once it has received all shards.
357-
go d.gatherShards(input, outs, numShards)
358-
}
359-
}
348+
d.startDownload(input)
360349
} else {
350+
out := input.downloadShard(d.client, d.config.perOperationTimeout, d.config.partSize)
361351
// If this isn't the first shard, send to the output channel specific to the object.
362352
// This should never block since the channel is buffered to exactly the number of shards.
363353
input.shardOutputs <- out
@@ -366,6 +356,47 @@ func (d *Downloader) downloadWorker() {
366356
d.workers.Done()
367357
}
368358

359+
// startDownload downloads the first shard and schedules subsequent shards
360+
// if necessary.
361+
func (d *Downloader) startDownload(input *DownloadObjectInput) {
362+
var out *DownloadOutput
363+
364+
// Full object read. Request the full object and only read partSize bytes
365+
// (or the full object, if smaller than partSize), so that we can avoid a
366+
// metadata call to grab the CRC32C for JSON downloads.
367+
if fullObjectRead(input.Range) {
368+
input.checkCRC = true
369+
out = input.downloadFirstShard(d.client, d.config.perOperationTimeout, d.config.partSize)
370+
} else {
371+
out = input.downloadShard(d.client, d.config.perOperationTimeout, d.config.partSize)
372+
}
373+
374+
if out.Err != nil {
375+
// Don't queue more shards if the first failed.
376+
d.addResult(input, out)
377+
return
378+
}
379+
380+
numShards := numShards(out.Attrs, input.Range, d.config.partSize)
381+
input.checkCRC = input.checkCRC && !out.Attrs.Decompressed // do not checksum if the object was decompressed
382+
383+
if numShards > 1 {
384+
outs := d.queueShards(input, out.Attrs.Generation, numShards)
385+
// Start a goroutine that gathers shards sent to the output
386+
// channel and adds the result once it has received all shards.
387+
go d.gatherShards(input, out, outs, numShards, out.crc32c)
388+
389+
} else {
390+
// Download completed with a single shard.
391+
if input.checkCRC {
392+
if err := checksumObject(out.crc32c, out.Attrs.CRC32C); err != nil {
393+
out.Err = err
394+
}
395+
}
396+
d.addResult(input, out)
397+
}
398+
}
399+
369400
// queueShards queues all subsequent shards of an object after the first.
370401
// The results should be forwarded to the returned channel.
371402
func (d *Downloader) queueShards(in *DownloadObjectInput, gen int64, shards int) <-chan *DownloadOutput {
@@ -397,12 +428,12 @@ var errCancelAllShards = errors.New("cancelled because another shard failed")
397428
// It will add the result to the Downloader once it has received all shards.
398429
// gatherShards cancels remaining shards if any shard errored.
399430
// It does not do any checking to verify that shards are for the same object.
400-
func (d *Downloader) gatherShards(in *DownloadObjectInput, outs <-chan *DownloadOutput, shards int) {
431+
func (d *Downloader) gatherShards(in *DownloadObjectInput, out *DownloadOutput, outs <-chan *DownloadOutput, shards int, firstPieceCRC uint32) {
401432
errs := []error{}
402-
var shardOut *DownloadOutput
433+
orderedChecksums := make([]crc32cPiece, shards-1)
434+
403435
for i := 1; i < shards; i++ {
404-
// Add monitoring here? This could hang if any individual piece does.
405-
shardOut = <-outs
436+
shardOut := <-outs
406437

407438
// We can ignore errors that resulted from a previous error.
408439
// Note that we may still get some cancel errors if they
@@ -412,20 +443,30 @@ func (d *Downloader) gatherShards(in *DownloadObjectInput, outs <-chan *Download
412443
errs = append(errs, shardOut.Err)
413444
in.cancelCtx(errCancelAllShards)
414445
}
446+
447+
orderedChecksums[shardOut.shard-1] = crc32cPiece{sum: shardOut.crc32c, length: shardOut.shardLength}
448+
}
449+
450+
// All pieces gathered.
451+
if len(errs) == 0 && in.checkCRC && out.Attrs != nil {
452+
fullCrc := joinCRC32C(firstPieceCRC, orderedChecksums)
453+
if err := checksumObject(fullCrc, out.Attrs.CRC32C); err != nil {
454+
errs = append(errs, err)
455+
}
415456
}
416457

417-
// All pieces gathered; return output. Any shard output will do.
418-
shardOut.Range = in.Range
458+
// Prepare output.
459+
out.Range = in.Range
419460
if len(errs) != 0 {
420-
shardOut.Err = fmt.Errorf("download shard errors:\n%w", errors.Join(errs...))
461+
out.Err = fmt.Errorf("download shard errors:\n%w", errors.Join(errs...))
421462
}
422-
if shardOut.Attrs != nil {
423-
shardOut.Attrs.StartOffset = 0
463+
if out.Attrs != nil {
464+
out.Attrs.StartOffset = 0
424465
if in.Range != nil {
425-
shardOut.Attrs.StartOffset = in.Range.Offset
466+
out.Attrs.StartOffset = in.Range.Offset
426467
}
427468
}
428-
d.addResult(in, shardOut)
469+
d.addResult(in, out)
429470
}
430471

431472
// gatherObjectOutputs receives from the given channel exactly numObjects times.
@@ -563,45 +604,18 @@ type DownloadObjectInput struct {
563604
shardOutputs chan<- *DownloadOutput
564605
directory bool // input was queued by calling DownloadDirectory
565606
directoryObjectOutputs chan<- DownloadOutput
607+
checkCRC bool
566608
}
567609

568610
// downloadShard will read a specific object piece into in.Destination.
569611
// If timeout is less than 0, no timeout is set.
570612
func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout time.Duration, partSize int64) (out *DownloadOutput) {
571613
out = &DownloadOutput{Bucket: in.Bucket, Object: in.Object, Range: in.Range}
572614

573-
// Set timeout.
574-
ctx := in.ctx
575-
if timeout > 0 {
576-
c, cancel := context.WithTimeout(ctx, timeout)
577-
defer cancel()
578-
ctx = c
579-
}
580-
581-
// The first shard will be sent as download many, since we do not know yet
582-
// if it will be sharded.
583-
method := downloadMany
584-
if in.shard != 0 {
585-
method = downloadSharded
586-
}
587-
ctx = setUsageMetricHeader(ctx, method)
588-
589-
// Set options on the object.
590-
o := client.Bucket(in.Bucket).Object(in.Object)
591-
592-
if in.Conditions != nil {
593-
o = o.If(*in.Conditions)
594-
}
595-
if in.Generation != nil {
596-
o = o.Generation(*in.Generation)
597-
}
598-
if len(in.EncryptionKey) > 0 {
599-
o = o.Key(in.EncryptionKey)
600-
}
601-
602615
objRange := shardRange(in.Range, partSize, in.shard)
616+
ctx := in.setOptionsOnContext(timeout)
617+
o := in.setOptionsOnObject(client)
603618

604-
// Read.
605619
r, err := o.NewRangeReader(ctx, objRange.Offset, objRange.Length)
606620
if err != nil {
607621
out.Err = err
@@ -618,9 +632,63 @@ func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout tim
618632
}
619633
}
620634

621-
w := io.NewOffsetWriter(in.Destination, offset)
622-
_, err = io.Copy(w, r)
635+
var w io.Writer
636+
w = io.NewOffsetWriter(in.Destination, offset)
637+
638+
var crcHash hash.Hash32
639+
if in.checkCRC {
640+
crcHash = crc32.New(crc32.MakeTable(crc32.Castagnoli))
641+
w = io.MultiWriter(w, crcHash)
642+
}
643+
644+
n, err := io.Copy(w, r)
645+
if err != nil {
646+
out.Err = err
647+
r.Close()
648+
return
649+
}
650+
651+
if err = r.Close(); err != nil {
652+
out.Err = err
653+
return
654+
}
655+
656+
out.Attrs = &r.Attrs
657+
out.shard = in.shard
658+
out.shardLength = n
659+
if in.checkCRC {
660+
out.crc32c = crcHash.Sum32()
661+
}
662+
return
663+
}
664+
665+
// downloadFirstShard will read the first object piece into in.Destination.
666+
// If timeout is less than 0, no timeout is set.
667+
func (in *DownloadObjectInput) downloadFirstShard(client *storage.Client, timeout time.Duration, partSize int64) (out *DownloadOutput) {
668+
out = &DownloadOutput{Bucket: in.Bucket, Object: in.Object, Range: in.Range}
669+
670+
ctx := in.setOptionsOnContext(timeout)
671+
o := in.setOptionsOnObject(client)
672+
673+
r, err := o.NewReader(ctx)
623674
if err != nil {
675+
out.Err = err
676+
return
677+
}
678+
679+
var w io.Writer
680+
w = io.NewOffsetWriter(in.Destination, 0)
681+
682+
var crcHash hash.Hash32
683+
if in.checkCRC {
684+
crcHash = crc32.New(crc32.MakeTable(crc32.Castagnoli))
685+
w = io.MultiWriter(w, crcHash)
686+
}
687+
688+
// Copy only the first partSize bytes before closing the reader.
689+
// If we encounter an EOF, the file was smaller than partSize.
690+
n, err := io.CopyN(w, r, partSize)
691+
if err != nil && err != io.EOF {
624692
out.Err = err
625693
r.Close()
626694
return
@@ -632,9 +700,45 @@ func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout tim
632700
}
633701

634702
out.Attrs = &r.Attrs
703+
out.shard = in.shard
704+
out.shardLength = n
705+
if in.checkCRC {
706+
out.crc32c = crcHash.Sum32()
707+
}
635708
return
636709
}
637710

711+
func (in *DownloadObjectInput) setOptionsOnContext(timeout time.Duration) context.Context {
712+
ctx := in.ctx
713+
if timeout > 0 {
714+
c, cancel := context.WithTimeout(ctx, timeout)
715+
defer cancel()
716+
ctx = c
717+
}
718+
719+
// The first shard will be sent as download many, since we do not know yet
720+
// if it will be sharded.
721+
method := downloadMany
722+
if in.shard != 0 {
723+
method = downloadSharded
724+
}
725+
return setUsageMetricHeader(ctx, method)
726+
}
727+
728+
func (in *DownloadObjectInput) setOptionsOnObject(client *storage.Client) *storage.ObjectHandle {
729+
o := client.Bucket(in.Bucket).Object(in.Object)
730+
if in.Conditions != nil {
731+
o = o.If(*in.Conditions)
732+
}
733+
if in.Generation != nil {
734+
o = o.Generation(*in.Generation)
735+
}
736+
if len(in.EncryptionKey) > 0 {
737+
o = o.Key(in.EncryptionKey)
738+
}
739+
return o
740+
}
741+
638742
// DownloadDirectoryInput is the input for a directory to download.
639743
type DownloadDirectoryInput struct {
640744
// Bucket is the bucket in GCS to download from. Required.
@@ -686,6 +790,10 @@ type DownloadOutput struct {
686790
Range *DownloadRange // requested range, if it was specified
687791
Err error // error occurring during download
688792
Attrs *storage.ReaderObjectAttrs // attributes of downloaded object, if successful
793+
794+
shard int
795+
shardLength int64
796+
crc32c uint32
689797
}
690798

691799
// TODO: use built-in after go < 1.21 is dropped.
@@ -784,3 +892,48 @@ func setUsageMetricHeader(ctx context.Context, method string) context.Context {
784892
header := fmt.Sprintf("%s/%s", usageMetricKey, method)
785893
return callctx.SetHeaders(ctx, xGoogHeaderKey, header)
786894
}
895+
896+
type crc32cPiece struct {
897+
sum uint32 // crc32c checksum of the piece
898+
length int64 // number of bytes in this piece
899+
}
900+
901+
// joinCRC32C pieces together the initial checksum with the orderedChecksums
902+
// provided to calculate the checksum of the whole.
903+
func joinCRC32C(initialChecksum uint32, orderedChecksums []crc32cPiece) uint32 {
904+
base := initialChecksum
905+
906+
zeroes := make([]byte, maxChecksumZeroArraySize)
907+
for _, part := range orderedChecksums {
908+
// Precondition Base (flip every bit)
909+
base ^= 0xFFFFFFFF
910+
911+
// Zero pad base crc32c. To conserve memory, do so with only maxChecksumZeroArraySize
912+
// at a time. Reuse the zeroes array where possible.
913+
var padded int64 = 0
914+
for padded < part.length {
915+
desiredZeroes := min(part.length-padded, maxChecksumZeroArraySize)
916+
base = crc32.Update(base, crc32.MakeTable(crc32.Castagnoli), zeroes[:desiredZeroes])
917+
padded += desiredZeroes
918+
}
919+
920+
// Postcondition Base (same as precondition, this switches the bits back)
921+
base ^= 0xFFFFFFFF
922+
923+
// Bitwise OR between Base and Part to produce a new Base
924+
base ^= part.sum
925+
}
926+
return base
927+
}
928+
929+
func fullObjectRead(r *DownloadRange) bool {
930+
return r == nil || (r.Offset == 0 && r.Length < 0)
931+
}
932+
933+
func checksumObject(got, want uint32) error {
934+
// Only checksum the object if we have a valid CRC32C.
935+
if want != 0 && want != got {
936+
return fmt.Errorf("bad CRC on read: got %d, want %d", got, want)
937+
}
938+
return nil
939+
}

0 commit comments

Comments
 (0)