Skip to content

chunked: use temporary file for tar-split data #2312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion drivers/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ type DriverWithDifferOutput struct {
CompressedDigest digest.Digest
Metadata string
BigData map[string][]byte
TarSplit []byte // nil if not available
TarSplit *os.File // nil if not available
TOCDigest digest.Digest
// RootDirMode is the mode of the root directory of the layer, if specified.
RootDirMode *os.FileMode
Expand Down Expand Up @@ -267,6 +267,7 @@ type DifferOptions struct {
// This API is experimental and can be changed without bumping the major version number.
type Differ interface {
ApplyDiff(dest string, options *archive.TarOptions, differOpts *DifferOptions) (DriverWithDifferOutput, error)
Close() error
}

// DriverWithDiffer is the interface for direct diff access.
Expand Down
6 changes: 5 additions & 1 deletion layers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2550,10 +2550,14 @@ func (r *layerStore) applyDiffFromStagingDirectory(id string, diffOutput *driver
if err != nil {
compressor = pgzip.NewWriter(&tsdata)
}
if _, err := diffOutput.TarSplit.Seek(0, 0); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.SeekStart, throughout

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #2328 .

return err
}

if err := compressor.SetConcurrency(1024*1024, 1); err != nil { // 1024*1024 is the hard-coded default; we're not changing that
logrus.Infof("setting compression concurrency threads to 1: %v; ignoring", err)
}
if _, err := compressor.Write(diffOutput.TarSplit); err != nil {
if _, err := diffOutput.TarSplit.WriteTo(compressor); err != nil {
compressor.Close()
return err
}
Expand Down
82 changes: 69 additions & 13 deletions pkg/chunked/compression_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"maps"
"os"
"slices"
"strconv"
"time"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/vbatts/tar-split/archive/tar"
"github.com/vbatts/tar-split/tar/asm"
"github.com/vbatts/tar-split/tar/storage"
"golang.org/x/sys/unix"
)

const (
Expand Down Expand Up @@ -157,10 +159,33 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64,
return manifestUncompressed, tocOffset, nil
}

func openTmpFile(tmpDir string) (*os.File, error) {
file, err := os.OpenFile(tmpDir, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC|unix.O_EXCL, 0o600)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Contra #2312 (comment) ) this seems to work, but AFAIK nothing promises that passing syscall.O_… flags to os.OpenFile is supported — let alone flags from a “third-party” x/sys/unix.O_….

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #2328 .

if err == nil {
return file, nil
}
return openTmpFileNoTmpFile(tmpDir)
}

// openTmpFileNoTmpFile is a fallback used by openTmpFile when the underlying file system does not
// support O_TMPFILE.
func openTmpFileNoTmpFile(tmpDir string) (*os.File, error) {
file, err := os.CreateTemp(tmpDir, ".tmpfile")
if err != nil {
return nil, err
}
// Unlink the file immediately so that only the open fd refers to it.
_ = os.Remove(file.Name())
return file, nil
}

// readZstdChunkedManifest reads the zstd:chunked manifest from the seekable stream blobStream.
// Returns (manifest blob, parsed manifest, tar-split blob or nil, manifest offset).
// tmpDir is a directory where the tar-split temporary file is written to. The file is opened with
// O_TMPFILE so that it is automatically removed when it is closed.
// Returns (manifest blob, parsed manifest, tar-split file or nil, manifest offset). The opened tar-split file
// points to the end of the file (equivalent to Seek(0, 2)).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.SeekEnd

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2328 removes that instead.

// It may return an error matching ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert.
func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) (_ []byte, _ *minimal.TOC, _ []byte, _ int64, retErr error) {
func readZstdChunkedManifest(tmpDir string, blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) (_ []byte, _ *minimal.TOC, _ *os.File, _ int64, retErr error) {
offsetMetadata := annotations[minimal.ManifestInfoKey]
if offsetMetadata == "" {
return nil, nil, nil, 0, fmt.Errorf("%q annotation missing", minimal.ManifestInfoKey)
Expand Down Expand Up @@ -245,7 +270,7 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di
return nil, nil, nil, 0, fmt.Errorf("unmarshaling TOC: %w", err)
}

var decodedTarSplit []byte = nil
var decodedTarSplit *os.File
if toc.TarSplitDigest != "" {
if tarSplitChunk.Offset <= 0 {
return nil, nil, nil, 0, fmt.Errorf("TOC requires a tar-split, but the %s annotation does not describe a position", minimal.TarSplitInfoKey)
Expand All @@ -254,14 +279,19 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di
if err != nil {
return nil, nil, nil, 0, err
}
decodedTarSplit, err = decodeAndValidateBlob(tarSplit, tarSplitLengthUncompressed, toc.TarSplitDigest.String())
decodedTarSplit, err = openTmpFile(tmpDir)
if err != nil {
return nil, nil, nil, 0, err
}
if err := decodeAndValidateBlobToStream(tarSplit, decodedTarSplit, toc.TarSplitDigest.String()); err != nil {
decodedTarSplit.Close()
return nil, nil, nil, 0, fmt.Errorf("validating and decompressing tar-split: %w", err)
}
// We use the TOC for creating on-disk files, but the tar-split for creating metadata
// when exporting the layer contents. Ensure the two match, otherwise local inspection of a container
// might be misleading about the exported contents.
if err := ensureTOCMatchesTarSplit(toc, decodedTarSplit); err != nil {
decodedTarSplit.Close()
return nil, nil, nil, 0, fmt.Errorf("tar-split and TOC data is inconsistent: %w", err)
}
} else if tarSplitChunk.Offset > 0 {
Expand All @@ -278,7 +308,7 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di
}

// ensureTOCMatchesTarSplit validates that toc and tarSplit contain _exactly_ the same entries.
func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error {
func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit *os.File) error {
pendingFiles := map[string]*minimal.FileMetadata{} // Name -> an entry in toc.Entries
for i := range toc.Entries {
e := &toc.Entries[i]
Expand All @@ -290,7 +320,11 @@ func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error {
}
}

unpacker := storage.NewJSONUnpacker(bytes.NewReader(tarSplit))
if _, err := tarSplit.Seek(0, 0); err != nil {
return err
}

unpacker := storage.NewJSONUnpacker(tarSplit)
if err := asm.IterateHeaders(unpacker, func(hdr *tar.Header) error {
e, ok := pendingFiles[hdr.Name]
if !ok {
Expand Down Expand Up @@ -320,10 +354,10 @@ func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error {
}

// tarSizeFromTarSplit computes the total tarball size, using only the tarSplit metadata
func tarSizeFromTarSplit(tarSplit []byte) (int64, error) {
func tarSizeFromTarSplit(tarSplit io.Reader) (int64, error) {
var res int64 = 0

unpacker := storage.NewJSONUnpacker(bytes.NewReader(tarSplit))
unpacker := storage.NewJSONUnpacker(tarSplit)
for {
entry, err := unpacker.Next()
if err != nil {
Expand Down Expand Up @@ -433,22 +467,29 @@ func ensureFileMetadataAttributesMatch(a, b *minimal.FileMetadata) error {
return nil
}

func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompressedChecksum string) ([]byte, error) {
func validateBlob(blob []byte, expectedCompressedChecksum string) error {
d, err := digest.Parse(expectedCompressedChecksum)
if err != nil {
return nil, fmt.Errorf("invalid digest %q: %w", expectedCompressedChecksum, err)
return fmt.Errorf("invalid digest %q: %w", expectedCompressedChecksum, err)
}

blobDigester := d.Algorithm().Digester()
blobChecksum := blobDigester.Hash()
if _, err := blobChecksum.Write(blob); err != nil {
return nil, err
return err
}
if blobDigester.Digest() != d {
return nil, fmt.Errorf("invalid blob checksum, expected checksum %s, got %s", d, blobDigester.Digest())
return fmt.Errorf("invalid blob checksum, expected checksum %s, got %s", d, blobDigester.Digest())
}
return nil
}

func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompressedChecksum string) ([]byte, error) {
if err := validateBlob(blob, expectedCompressedChecksum); err != nil {
return nil, err
}

decoder, err := zstd.NewReader(nil) //nolint:contextcheck
decoder, err := zstd.NewReader(nil)
if err != nil {
return nil, err
}
Expand All @@ -457,3 +498,18 @@ func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompr
b := make([]byte, 0, lengthUncompressed)
return decoder.DecodeAll(blob, b)
}

func decodeAndValidateBlobToStream(blob []byte, w *os.File, expectedCompressedChecksum string) error {
if err := validateBlob(blob, expectedCompressedChecksum); err != nil {
return err
}

decoder, err := zstd.NewReader(bytes.NewReader(blob)) //nolint:contextcheck
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the lint suppression match anything?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #2328 .

if err != nil {
return err
}
defer decoder.Close()

_, err = decoder.WriteTo(w)
return err
}
25 changes: 24 additions & 1 deletion pkg/chunked/compression_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package chunked

import (
"bytes"
"fmt"
"io"
"os"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -39,7 +41,28 @@ func TestTarSizeFromTarSplit(t *testing.T) {
_, err = io.Copy(io.Discard, tsReader)
require.NoError(t, err)

res, err := tarSizeFromTarSplit(tarSplit.Bytes())
res, err := tarSizeFromTarSplit(&tarSplit)
require.NoError(t, err)
assert.Equal(t, expectedTarSize, res)
}

func TestOpenTmpFile(t *testing.T) {
tmpDir := t.TempDir()
for range 1000 {
// scope for cleanup
f := func(fn func(tmpDir string) (*os.File, error)) {
file, err := fn(tmpDir)
assert.NoError(t, err)
defer file.Close()

path, err := os.Readlink(fmt.Sprintf("/proc/self/fd/%d", file.Fd()))
assert.NoError(t, err)

// the path under /proc/self/fd/$FD has the prefix "(deleted)" when the file
// is unlinked
assert.Contains(t, path, "(deleted)")
}
f(openTmpFile)
f(openTmpFileNoTmpFile)
}
}
30 changes: 21 additions & 9 deletions pkg/chunked/storage_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package chunked

import (
archivetar "archive/tar"
"bytes"
"context"
"encoding/base64"
"errors"
Expand Down Expand Up @@ -89,7 +88,7 @@ type chunkedDiffer struct {
tocOffset int64
manifest []byte
toc *minimal.TOC // The parsed contents of manifest, or nil if not yet available
tarSplit []byte
tarSplit *os.File
uncompressedTarSize int64 // -1 if unknown
// skipValidation is set to true if the individual files in
// the layer are trusted and should not be validated.
Expand Down Expand Up @@ -164,13 +163,11 @@ func (c *chunkedDiffer) convertTarToZstdChunked(destDirectory string, payload *o

defer diff.Close()

fd, err := unix.Open(destDirectory, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC, 0o600)
f, err := openTmpFile(destDirectory)
if err != nil {
return 0, nil, "", nil, &fs.PathError{Op: "open", Path: destDirectory, Err: err}
return 0, nil, "", nil, err
}

f := os.NewFile(uintptr(fd), destDirectory)

newAnnotations := make(map[string]string)
level := 1
chunked, err := compressor.ZstdCompressor(f, newAnnotations, &level)
Expand All @@ -193,6 +190,15 @@ func (c *chunkedDiffer) convertTarToZstdChunked(destDirectory string, payload *o
return copied, newSeekableFile(f), convertedOutputDigester.Digest(), newAnnotations, nil
}

func (c *chunkedDiffer) Close() error {
if c.tarSplit != nil {
err := c.tarSplit.Close()
c.tarSplit = nil
return err
}
return nil
}

// GetDiffer returns a differ than can be used with ApplyDiffWithDiffer.
// If it returns an error that matches ErrFallbackToOrdinaryLayerDownload, the caller can
// retry the operation with a different method.
Expand Down Expand Up @@ -333,13 +339,16 @@ func makeConvertFromRawDiffer(store storage.Store, blobDigest digest.Digest, blo
// makeZstdChunkedDiffer sets up a chunkedDiffer for a zstd:chunked layer.
// It may return an error matching ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert.
func makeZstdChunkedDiffer(store storage.Store, blobSize int64, tocDigest digest.Digest, annotations map[string]string, iss ImageSourceSeekable, pullOptions pullOptions) (*chunkedDiffer, error) {
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(iss, tocDigest, annotations)
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(store.RunRoot(), iss, tocDigest, annotations)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunRoot is, by default, in /run … so the net total on this code path is that it moves the uncompressed tar-split from process memory (which can be swapped out, and is limited by swap size) to a tmpfs (also limited by swap size).

It’s still a bit of an improvement, because we have a bit of a more direct control over allocation / uses / lifetime of the memory — but it does it substantially change the actual limit?

if err != nil { // May be ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert
return nil, fmt.Errorf("read zstd:chunked manifest: %w", err)
}

var uncompressedTarSize int64 = -1
if tarSplit != nil {
if _, err := tarSplit.Seek(0, 0); err != nil {
return nil, err
}
uncompressedTarSize, err = tarSizeFromTarSplit(tarSplit)
if err != nil {
return nil, fmt.Errorf("computing size from tar-split: %w", err)
Expand Down Expand Up @@ -1435,7 +1444,7 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
if tocDigest == nil {
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("internal error: just-created zstd:chunked missing TOC digest")
}
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(fileSource, *tocDigest, annotations)
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(dest, fileSource, *tocDigest, annotations)
if err != nil {
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("read zstd:chunked manifest: %w", err)
}
Expand Down Expand Up @@ -1842,7 +1851,10 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
case c.pullOptions.insecureAllowUnpredictableImageContents:
// Oh well. Skip the costly digest computation.
case output.TarSplit != nil:
metadata := tsStorage.NewJSONUnpacker(bytes.NewReader(output.TarSplit))
if _, err := output.TarSplit.Seek(0, 0); err != nil {
return output, err
}
metadata := tsStorage.NewJSONUnpacker(output.TarSplit)
fg := newStagedFileGetter(dirFile, flatPathNameMap)
digester := digest.Canonical.Digester()
if err := asm.WriteOutputTarStream(fg, metadata, digester.Hash()); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunked/zstdchunked_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestGenerateAndParseManifest(t *testing.T) {
tocDigest, err := toc.GetTOCDigest(annotations)
require.NoError(t, err)
require.NotNil(t, tocDigest)
manifest, decodedTOC, _, _, err := readZstdChunkedManifest(s, *tocDigest, annotations)
manifest, decodedTOC, _, _, err := readZstdChunkedManifest(t.TempDir(), s, *tocDigest, annotations)
require.NoError(t, err)

var toc minimal.TOC
Expand Down