-
Notifications
You must be signed in to change notification settings - Fork 258
chunked: use temporary file for tar-split data #2312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8ebe960
3724a5a
f5bdfdc
729821c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2550,10 +2550,14 @@ func (r *layerStore) applyDiffFromStagingDirectory(id string, diffOutput *driver | |
if err != nil { | ||
compressor = pgzip.NewWriter(&tsdata) | ||
} | ||
if _, err := diffOutput.TarSplit.Seek(0, 0); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In #2328 . |
||
return err | ||
} | ||
|
||
if err := compressor.SetConcurrency(1024*1024, 1); err != nil { // 1024*1024 is the hard-coded default; we're not changing that | ||
logrus.Infof("setting compression concurrency threads to 1: %v; ignoring", err) | ||
} | ||
if _, err := compressor.Write(diffOutput.TarSplit); err != nil { | ||
if _, err := diffOutput.TarSplit.WriteTo(compressor); err != nil { | ||
compressor.Close() | ||
return err | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |
"fmt" | ||
"io" | ||
"maps" | ||
"os" | ||
"slices" | ||
"strconv" | ||
"time" | ||
|
@@ -18,6 +19,7 @@ import ( | |
"github.com/vbatts/tar-split/archive/tar" | ||
"github.com/vbatts/tar-split/tar/asm" | ||
"github.com/vbatts/tar-split/tar/storage" | ||
"golang.org/x/sys/unix" | ||
) | ||
|
||
const ( | ||
|
@@ -157,10 +159,33 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64, | |
return manifestUncompressed, tocOffset, nil | ||
} | ||
|
||
func openTmpFile(tmpDir string) (*os.File, error) { | ||
file, err := os.OpenFile(tmpDir, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC|unix.O_EXCL, 0o600) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Contra #2312 (comment) ) this seems to work, but AFAIK nothing promises that passing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In #2328 . |
||
if err == nil { | ||
return file, nil | ||
} | ||
return openTmpFileNoTmpFile(tmpDir) | ||
} | ||
|
||
// openTmpFileNoTmpFile is a fallback used by openTmpFile when the underlying file system does not | ||
// support O_TMPFILE. | ||
func openTmpFileNoTmpFile(tmpDir string) (*os.File, error) { | ||
kolyshkin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
file, err := os.CreateTemp(tmpDir, ".tmpfile") | ||
if err != nil { | ||
return nil, err | ||
} | ||
// Unlink the file immediately so that only the open fd refers to it. | ||
_ = os.Remove(file.Name()) | ||
return file, nil | ||
} | ||
|
||
// readZstdChunkedManifest reads the zstd:chunked manifest from the seekable stream blobStream. | ||
// Returns (manifest blob, parsed manifest, tar-split blob or nil, manifest offset). | ||
// tmpDir is a directory where the tar-split temporary file is written to. The file is opened with | ||
// O_TMPFILE so that it is automatically removed when it is closed. | ||
// Returns (manifest blob, parsed manifest, tar-split file or nil, manifest offset). The opened tar-split file | ||
// points to the end of the file (equivalent to Seek(0, 2)). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #2328 removes that instead. |
||
// It may return an error matching ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert. | ||
func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) (_ []byte, _ *minimal.TOC, _ []byte, _ int64, retErr error) { | ||
func readZstdChunkedManifest(tmpDir string, blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) (_ []byte, _ *minimal.TOC, _ *os.File, _ int64, retErr error) { | ||
offsetMetadata := annotations[minimal.ManifestInfoKey] | ||
if offsetMetadata == "" { | ||
return nil, nil, nil, 0, fmt.Errorf("%q annotation missing", minimal.ManifestInfoKey) | ||
|
@@ -245,7 +270,7 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di | |
return nil, nil, nil, 0, fmt.Errorf("unmarshaling TOC: %w", err) | ||
} | ||
|
||
var decodedTarSplit []byte = nil | ||
var decodedTarSplit *os.File | ||
if toc.TarSplitDigest != "" { | ||
if tarSplitChunk.Offset <= 0 { | ||
return nil, nil, nil, 0, fmt.Errorf("TOC requires a tar-split, but the %s annotation does not describe a position", minimal.TarSplitInfoKey) | ||
|
@@ -254,14 +279,19 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di | |
if err != nil { | ||
return nil, nil, nil, 0, err | ||
} | ||
decodedTarSplit, err = decodeAndValidateBlob(tarSplit, tarSplitLengthUncompressed, toc.TarSplitDigest.String()) | ||
decodedTarSplit, err = openTmpFile(tmpDir) | ||
if err != nil { | ||
return nil, nil, nil, 0, err | ||
giuseppe marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
if err := decodeAndValidateBlobToStream(tarSplit, decodedTarSplit, toc.TarSplitDigest.String()); err != nil { | ||
decodedTarSplit.Close() | ||
giuseppe marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return nil, nil, nil, 0, fmt.Errorf("validating and decompressing tar-split: %w", err) | ||
} | ||
// We use the TOC for creating on-disk files, but the tar-split for creating metadata | ||
// when exporting the layer contents. Ensure the two match, otherwise local inspection of a container | ||
// might be misleading about the exported contents. | ||
if err := ensureTOCMatchesTarSplit(toc, decodedTarSplit); err != nil { | ||
decodedTarSplit.Close() | ||
return nil, nil, nil, 0, fmt.Errorf("tar-split and TOC data is inconsistent: %w", err) | ||
} | ||
} else if tarSplitChunk.Offset > 0 { | ||
|
@@ -278,7 +308,7 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di | |
} | ||
|
||
// ensureTOCMatchesTarSplit validates that toc and tarSplit contain _exactly_ the same entries. | ||
func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error { | ||
func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit *os.File) error { | ||
pendingFiles := map[string]*minimal.FileMetadata{} // Name -> an entry in toc.Entries | ||
for i := range toc.Entries { | ||
e := &toc.Entries[i] | ||
|
@@ -290,7 +320,11 @@ func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error { | |
} | ||
} | ||
|
||
unpacker := storage.NewJSONUnpacker(bytes.NewReader(tarSplit)) | ||
if _, err := tarSplit.Seek(0, 0); err != nil { | ||
return err | ||
} | ||
|
||
unpacker := storage.NewJSONUnpacker(tarSplit) | ||
if err := asm.IterateHeaders(unpacker, func(hdr *tar.Header) error { | ||
e, ok := pendingFiles[hdr.Name] | ||
if !ok { | ||
|
@@ -320,10 +354,10 @@ func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error { | |
} | ||
|
||
// tarSizeFromTarSplit computes the total tarball size, using only the tarSplit metadata | ||
func tarSizeFromTarSplit(tarSplit []byte) (int64, error) { | ||
func tarSizeFromTarSplit(tarSplit io.Reader) (int64, error) { | ||
var res int64 = 0 | ||
|
||
unpacker := storage.NewJSONUnpacker(bytes.NewReader(tarSplit)) | ||
unpacker := storage.NewJSONUnpacker(tarSplit) | ||
for { | ||
entry, err := unpacker.Next() | ||
if err != nil { | ||
|
@@ -433,22 +467,29 @@ func ensureFileMetadataAttributesMatch(a, b *minimal.FileMetadata) error { | |
return nil | ||
} | ||
|
||
func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompressedChecksum string) ([]byte, error) { | ||
func validateBlob(blob []byte, expectedCompressedChecksum string) error { | ||
d, err := digest.Parse(expectedCompressedChecksum) | ||
if err != nil { | ||
return nil, fmt.Errorf("invalid digest %q: %w", expectedCompressedChecksum, err) | ||
return fmt.Errorf("invalid digest %q: %w", expectedCompressedChecksum, err) | ||
} | ||
|
||
blobDigester := d.Algorithm().Digester() | ||
blobChecksum := blobDigester.Hash() | ||
if _, err := blobChecksum.Write(blob); err != nil { | ||
return nil, err | ||
return err | ||
} | ||
if blobDigester.Digest() != d { | ||
return nil, fmt.Errorf("invalid blob checksum, expected checksum %s, got %s", d, blobDigester.Digest()) | ||
return fmt.Errorf("invalid blob checksum, expected checksum %s, got %s", d, blobDigester.Digest()) | ||
} | ||
return nil | ||
} | ||
|
||
func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompressedChecksum string) ([]byte, error) { | ||
if err := validateBlob(blob, expectedCompressedChecksum); err != nil { | ||
return nil, err | ||
} | ||
|
||
decoder, err := zstd.NewReader(nil) //nolint:contextcheck | ||
decoder, err := zstd.NewReader(nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -457,3 +498,18 @@ func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompr | |
b := make([]byte, 0, lengthUncompressed) | ||
return decoder.DecodeAll(blob, b) | ||
} | ||
|
||
func decodeAndValidateBlobToStream(blob []byte, w *os.File, expectedCompressedChecksum string) error { | ||
if err := validateBlob(blob, expectedCompressedChecksum); err != nil { | ||
return err | ||
} | ||
|
||
decoder, err := zstd.NewReader(bytes.NewReader(blob)) //nolint:contextcheck | ||
giuseppe marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the lint suppression match anything? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In #2328 . |
||
if err != nil { | ||
return err | ||
} | ||
defer decoder.Close() | ||
|
||
_, err = decoder.WriteTo(w) | ||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,6 @@ package chunked | |
|
||
import ( | ||
archivetar "archive/tar" | ||
"bytes" | ||
"context" | ||
"encoding/base64" | ||
"errors" | ||
|
@@ -89,7 +88,7 @@ type chunkedDiffer struct { | |
tocOffset int64 | ||
manifest []byte | ||
toc *minimal.TOC // The parsed contents of manifest, or nil if not yet available | ||
tarSplit []byte | ||
tarSplit *os.File | ||
uncompressedTarSize int64 // -1 if unknown | ||
// skipValidation is set to true if the individual files in | ||
// the layer are trusted and should not be validated. | ||
|
@@ -164,13 +163,11 @@ func (c *chunkedDiffer) convertTarToZstdChunked(destDirectory string, payload *o | |
|
||
defer diff.Close() | ||
|
||
fd, err := unix.Open(destDirectory, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC, 0o600) | ||
f, err := openTmpFile(destDirectory) | ||
if err != nil { | ||
return 0, nil, "", nil, &fs.PathError{Op: "open", Path: destDirectory, Err: err} | ||
return 0, nil, "", nil, err | ||
} | ||
|
||
f := os.NewFile(uintptr(fd), destDirectory) | ||
|
||
newAnnotations := make(map[string]string) | ||
level := 1 | ||
chunked, err := compressor.ZstdCompressor(f, newAnnotations, &level) | ||
|
@@ -193,6 +190,15 @@ func (c *chunkedDiffer) convertTarToZstdChunked(destDirectory string, payload *o | |
return copied, newSeekableFile(f), convertedOutputDigester.Digest(), newAnnotations, nil | ||
} | ||
|
||
func (c *chunkedDiffer) Close() error { | ||
if c.tarSplit != nil { | ||
err := c.tarSplit.Close() | ||
c.tarSplit = nil | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
// GetDiffer returns a differ than can be used with ApplyDiffWithDiffer. | ||
// If it returns an error that matches ErrFallbackToOrdinaryLayerDownload, the caller can | ||
// retry the operation with a different method. | ||
|
@@ -333,13 +339,16 @@ func makeConvertFromRawDiffer(store storage.Store, blobDigest digest.Digest, blo | |
// makeZstdChunkedDiffer sets up a chunkedDiffer for a zstd:chunked layer. | ||
// It may return an error matching ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert. | ||
func makeZstdChunkedDiffer(store storage.Store, blobSize int64, tocDigest digest.Digest, annotations map[string]string, iss ImageSourceSeekable, pullOptions pullOptions) (*chunkedDiffer, error) { | ||
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(iss, tocDigest, annotations) | ||
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(store.RunRoot(), iss, tocDigest, annotations) | ||
mtrmac marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It’s still a bit of an improvement, because we have a bit of a more direct control over allocation / uses / lifetime of the memory — but it does it substantially change the actual limit? |
||
if err != nil { // May be ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert | ||
return nil, fmt.Errorf("read zstd:chunked manifest: %w", err) | ||
} | ||
|
||
var uncompressedTarSize int64 = -1 | ||
if tarSplit != nil { | ||
if _, err := tarSplit.Seek(0, 0); err != nil { | ||
return nil, err | ||
} | ||
uncompressedTarSize, err = tarSizeFromTarSplit(tarSplit) | ||
if err != nil { | ||
return nil, fmt.Errorf("computing size from tar-split: %w", err) | ||
|
@@ -1435,7 +1444,7 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff | |
if tocDigest == nil { | ||
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("internal error: just-created zstd:chunked missing TOC digest") | ||
} | ||
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(fileSource, *tocDigest, annotations) | ||
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(dest, fileSource, *tocDigest, annotations) | ||
if err != nil { | ||
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("read zstd:chunked manifest: %w", err) | ||
} | ||
|
@@ -1842,7 +1851,10 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff | |
case c.pullOptions.insecureAllowUnpredictableImageContents: | ||
// Oh well. Skip the costly digest computation. | ||
case output.TarSplit != nil: | ||
metadata := tsStorage.NewJSONUnpacker(bytes.NewReader(output.TarSplit)) | ||
if _, err := output.TarSplit.Seek(0, 0); err != nil { | ||
return output, err | ||
} | ||
metadata := tsStorage.NewJSONUnpacker(output.TarSplit) | ||
fg := newStagedFileGetter(dirFile, flatPathNameMap) | ||
digester := digest.Canonical.Digester() | ||
if err := asm.WriteOutputTarStream(fg, metadata, digester.Hash()); err != nil { | ||
|
Uh oh!
There was an error while loading. Please reload this page.