diff --git a/drivers/driver.go b/drivers/driver.go index 06263e750a..b218256300 100644 --- a/drivers/driver.go +++ b/drivers/driver.go @@ -216,7 +216,7 @@ type DriverWithDifferOutput struct { CompressedDigest digest.Digest Metadata string BigData map[string][]byte - TarSplit []byte // nil if not available + TarSplit *os.File // nil if not available TOCDigest digest.Digest // RootDirMode is the mode of the root directory of the layer, if specified. RootDirMode *os.FileMode @@ -267,6 +267,7 @@ type DifferOptions struct { // This API is experimental and can be changed without bumping the major version number. type Differ interface { ApplyDiff(dest string, options *archive.TarOptions, differOpts *DifferOptions) (DriverWithDifferOutput, error) + Close() error } // DriverWithDiffer is the interface for direct diff access. diff --git a/layers.go b/layers.go index e01edaec0c..b0be6c7d84 100644 --- a/layers.go +++ b/layers.go @@ -2550,10 +2550,14 @@ func (r *layerStore) applyDiffFromStagingDirectory(id string, diffOutput *driver if err != nil { compressor = pgzip.NewWriter(&tsdata) } + if _, err := diffOutput.TarSplit.Seek(0, 0); err != nil { + return err + } + if err := compressor.SetConcurrency(1024*1024, 1); err != nil { // 1024*1024 is the hard-coded default; we're not changing that logrus.Infof("setting compression concurrency threads to 1: %v; ignoring", err) } - if _, err := compressor.Write(diffOutput.TarSplit); err != nil { + if _, err := diffOutput.TarSplit.WriteTo(compressor); err != nil { compressor.Close() return err } diff --git a/pkg/chunked/compression_linux.go b/pkg/chunked/compression_linux.go index 62dd22dfb8..80a62e94cc 100644 --- a/pkg/chunked/compression_linux.go +++ b/pkg/chunked/compression_linux.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "maps" + "os" "slices" "strconv" "time" @@ -18,6 +19,7 @@ import ( "github.com/vbatts/tar-split/archive/tar" "github.com/vbatts/tar-split/tar/asm" "github.com/vbatts/tar-split/tar/storage" + "golang.org/x/sys/unix" ) const ( @@ -157,10 +159,33 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64, return manifestUncompressed, tocOffset, nil } +func openTmpFile(tmpDir string) (*os.File, error) { + file, err := os.OpenFile(tmpDir, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC|unix.O_EXCL, 0o600) + if err == nil { + return file, nil + } + return openTmpFileNoTmpFile(tmpDir) +} + +// openTmpFileNoTmpFile is a fallback used by openTmpFile when the underlying file system does not +// support O_TMPFILE. +func openTmpFileNoTmpFile(tmpDir string) (*os.File, error) { + file, err := os.CreateTemp(tmpDir, ".tmpfile") + if err != nil { + return nil, err + } + // Unlink the file immediately so that only the open fd refers to it. + _ = os.Remove(file.Name()) + return file, nil +} + // readZstdChunkedManifest reads the zstd:chunked manifest from the seekable stream blobStream. -// Returns (manifest blob, parsed manifest, tar-split blob or nil, manifest offset). +// tmpDir is a directory where the tar-split temporary file is written to. The file is opened with +// O_TMPFILE so that it is automatically removed when it is closed. +// Returns (manifest blob, parsed manifest, tar-split file or nil, manifest offset). The opened tar-split file +// points to the end of the file (equivalent to Seek(0, 2)). // It may return an error matching ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert. -func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) (_ []byte, _ *minimal.TOC, _ []byte, _ int64, retErr error) { +func readZstdChunkedManifest(tmpDir string, blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) (_ []byte, _ *minimal.TOC, _ *os.File, _ int64, retErr error) { offsetMetadata := annotations[minimal.ManifestInfoKey] if offsetMetadata == "" { return nil, nil, nil, 0, fmt.Errorf("%q annotation missing", minimal.ManifestInfoKey) @@ -245,7 +270,7 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di return nil, nil, nil, 0, fmt.Errorf("unmarshaling TOC: %w", err) } - var decodedTarSplit []byte = nil + var decodedTarSplit *os.File if toc.TarSplitDigest != "" { if tarSplitChunk.Offset <= 0 { return nil, nil, nil, 0, fmt.Errorf("TOC requires a tar-split, but the %s annotation does not describe a position", minimal.TarSplitInfoKey) @@ -254,14 +279,19 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di if err != nil { return nil, nil, nil, 0, err } - decodedTarSplit, err = decodeAndValidateBlob(tarSplit, tarSplitLengthUncompressed, toc.TarSplitDigest.String()) + decodedTarSplit, err = openTmpFile(tmpDir) if err != nil { + return nil, nil, nil, 0, err + } + if err := decodeAndValidateBlobToStream(tarSplit, decodedTarSplit, toc.TarSplitDigest.String()); err != nil { + decodedTarSplit.Close() return nil, nil, nil, 0, fmt.Errorf("validating and decompressing tar-split: %w", err) } // We use the TOC for creating on-disk files, but the tar-split for creating metadata // when exporting the layer contents. Ensure the two match, otherwise local inspection of a container // might be misleading about the exported contents. if err := ensureTOCMatchesTarSplit(toc, decodedTarSplit); err != nil { + decodedTarSplit.Close() return nil, nil, nil, 0, fmt.Errorf("tar-split and TOC data is inconsistent: %w", err) } } else if tarSplitChunk.Offset > 0 { @@ -278,7 +308,7 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di } // ensureTOCMatchesTarSplit validates that toc and tarSplit contain _exactly_ the same entries. -func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error { +func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit *os.File) error { pendingFiles := map[string]*minimal.FileMetadata{} // Name -> an entry in toc.Entries for i := range toc.Entries { e := &toc.Entries[i] @@ -290,7 +320,11 @@ func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error { } } - unpacker := storage.NewJSONUnpacker(bytes.NewReader(tarSplit)) + if _, err := tarSplit.Seek(0, 0); err != nil { + return err + } + + unpacker := storage.NewJSONUnpacker(tarSplit) if err := asm.IterateHeaders(unpacker, func(hdr *tar.Header) error { e, ok := pendingFiles[hdr.Name] if !ok { @@ -320,10 +354,10 @@ func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error { } // tarSizeFromTarSplit computes the total tarball size, using only the tarSplit metadata -func tarSizeFromTarSplit(tarSplit []byte) (int64, error) { +func tarSizeFromTarSplit(tarSplit io.Reader) (int64, error) { var res int64 = 0 - unpacker := storage.NewJSONUnpacker(bytes.NewReader(tarSplit)) + unpacker := storage.NewJSONUnpacker(tarSplit) for { entry, err := unpacker.Next() if err != nil { @@ -433,22 +467,29 @@ func ensureFileMetadataAttributesMatch(a, b *minimal.FileMetadata) error { return nil } -func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompressedChecksum string) ([]byte, error) { +func validateBlob(blob []byte, expectedCompressedChecksum string) error { d, err := digest.Parse(expectedCompressedChecksum) if err != nil { - return nil, fmt.Errorf("invalid digest %q: %w", expectedCompressedChecksum, err) + return fmt.Errorf("invalid digest %q: %w", expectedCompressedChecksum, err) } blobDigester := d.Algorithm().Digester() blobChecksum := blobDigester.Hash() if _, err := blobChecksum.Write(blob); err != nil { - return nil, err + return err } if blobDigester.Digest() != d { - return nil, fmt.Errorf("invalid blob checksum, expected checksum %s, got %s", d, blobDigester.Digest()) + return fmt.Errorf("invalid blob checksum, expected checksum %s, got %s", d, blobDigester.Digest()) + } + return nil +} + +func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompressedChecksum string) ([]byte, error) { + if err := validateBlob(blob, expectedCompressedChecksum); err != nil { + return nil, err } - decoder, err := zstd.NewReader(nil) //nolint:contextcheck + decoder, err := zstd.NewReader(nil) if err != nil { return nil, err } @@ -457,3 +498,18 @@ func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompr b := make([]byte, 0, lengthUncompressed) return decoder.DecodeAll(blob, b) } + +func decodeAndValidateBlobToStream(blob []byte, w *os.File, expectedCompressedChecksum string) error { + if err := validateBlob(blob, expectedCompressedChecksum); err != nil { + return err + } + + decoder, err := zstd.NewReader(bytes.NewReader(blob)) //nolint:contextcheck + if err != nil { + return err + } + defer decoder.Close() + + _, err = decoder.WriteTo(w) + return err +} diff --git a/pkg/chunked/compression_linux_test.go b/pkg/chunked/compression_linux_test.go index 1b4a18e0db..5cae79dccd 100644 --- a/pkg/chunked/compression_linux_test.go +++ b/pkg/chunked/compression_linux_test.go @@ -2,7 +2,9 @@ package chunked import ( "bytes" + "fmt" "io" + "os" "testing" "github.com/stretchr/testify/assert" @@ -39,7 +41,28 @@ func TestTarSizeFromTarSplit(t *testing.T) { _, err = io.Copy(io.Discard, tsReader) require.NoError(t, err) - res, err := tarSizeFromTarSplit(tarSplit.Bytes()) + res, err := tarSizeFromTarSplit(&tarSplit) require.NoError(t, err) assert.Equal(t, expectedTarSize, res) } + +func TestOpenTmpFile(t *testing.T) { + tmpDir := t.TempDir() + for range 1000 { + // scope for cleanup + f := func(fn func(tmpDir string) (*os.File, error)) { + file, err := fn(tmpDir) + assert.NoError(t, err) + defer file.Close() + + path, err := os.Readlink(fmt.Sprintf("/proc/self/fd/%d", file.Fd())) + assert.NoError(t, err) + + // the path under /proc/self/fd/$FD has the prefix "(deleted)" when the file + // is unlinked + assert.Contains(t, path, "(deleted)") + } + f(openTmpFile) + f(openTmpFileNoTmpFile) + } +} diff --git a/pkg/chunked/storage_linux.go b/pkg/chunked/storage_linux.go index 75148bf2b2..bbacff9981 100644 --- a/pkg/chunked/storage_linux.go +++ b/pkg/chunked/storage_linux.go @@ -2,7 +2,6 @@ package chunked import ( archivetar "archive/tar" - "bytes" "context" "encoding/base64" "errors" @@ -89,7 +88,7 @@ type chunkedDiffer struct { tocOffset int64 manifest []byte toc *minimal.TOC // The parsed contents of manifest, or nil if not yet available - tarSplit []byte + tarSplit *os.File uncompressedTarSize int64 // -1 if unknown // skipValidation is set to true if the individual files in // the layer are trusted and should not be validated. @@ -164,13 +163,11 @@ func (c *chunkedDiffer) convertTarToZstdChunked(destDirectory string, payload *o defer diff.Close() - fd, err := unix.Open(destDirectory, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC, 0o600) + f, err := openTmpFile(destDirectory) if err != nil { - return 0, nil, "", nil, &fs.PathError{Op: "open", Path: destDirectory, Err: err} + return 0, nil, "", nil, err } - f := os.NewFile(uintptr(fd), destDirectory) - newAnnotations := make(map[string]string) level := 1 chunked, err := compressor.ZstdCompressor(f, newAnnotations, &level) @@ -193,6 +190,15 @@ func (c *chunkedDiffer) convertTarToZstdChunked(destDirectory string, payload *o return copied, newSeekableFile(f), convertedOutputDigester.Digest(), newAnnotations, nil } +func (c *chunkedDiffer) Close() error { + if c.tarSplit != nil { + err := c.tarSplit.Close() + c.tarSplit = nil + return err + } + return nil +} + // GetDiffer returns a differ than can be used with ApplyDiffWithDiffer. // If it returns an error that matches ErrFallbackToOrdinaryLayerDownload, the caller can // retry the operation with a different method. @@ -333,13 +339,16 @@ func makeConvertFromRawDiffer(store storage.Store, blobDigest digest.Digest, blo // makeZstdChunkedDiffer sets up a chunkedDiffer for a zstd:chunked layer. // It may return an error matching ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert. func makeZstdChunkedDiffer(store storage.Store, blobSize int64, tocDigest digest.Digest, annotations map[string]string, iss ImageSourceSeekable, pullOptions pullOptions) (*chunkedDiffer, error) { - manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(iss, tocDigest, annotations) + manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(store.RunRoot(), iss, tocDigest, annotations) if err != nil { // May be ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert return nil, fmt.Errorf("read zstd:chunked manifest: %w", err) } var uncompressedTarSize int64 = -1 if tarSplit != nil { + if _, err := tarSplit.Seek(0, 0); err != nil { + return nil, err + } uncompressedTarSize, err = tarSizeFromTarSplit(tarSplit) if err != nil { return nil, fmt.Errorf("computing size from tar-split: %w", err) @@ -1435,7 +1444,7 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff if tocDigest == nil { return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("internal error: just-created zstd:chunked missing TOC digest") } - manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(fileSource, *tocDigest, annotations) + manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(dest, fileSource, *tocDigest, annotations) if err != nil { return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("read zstd:chunked manifest: %w", err) } @@ -1842,7 +1851,10 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff case c.pullOptions.insecureAllowUnpredictableImageContents: // Oh well. Skip the costly digest computation. case output.TarSplit != nil: - metadata := tsStorage.NewJSONUnpacker(bytes.NewReader(output.TarSplit)) + if _, err := output.TarSplit.Seek(0, 0); err != nil { + return output, err + } + metadata := tsStorage.NewJSONUnpacker(output.TarSplit) fg := newStagedFileGetter(dirFile, flatPathNameMap) digester := digest.Canonical.Digester() if err := asm.WriteOutputTarStream(fg, metadata, digester.Hash()); err != nil { diff --git a/pkg/chunked/zstdchunked_test.go b/pkg/chunked/zstdchunked_test.go index 138ddaef5b..f2834b42d8 100644 --- a/pkg/chunked/zstdchunked_test.go +++ b/pkg/chunked/zstdchunked_test.go @@ -179,7 +179,7 @@ func TestGenerateAndParseManifest(t *testing.T) { tocDigest, err := toc.GetTOCDigest(annotations) require.NoError(t, err) require.NotNil(t, tocDigest) - manifest, decodedTOC, _, _, err := readZstdChunkedManifest(s, *tocDigest, annotations) + manifest, decodedTOC, _, _, err := readZstdChunkedManifest(t.TempDir(), s, *tocDigest, annotations) require.NoError(t, err) var toc minimal.TOC