Skip to content

Commit d856331

Browse files
committed
chunked: use temporary file for tar-split data
Replace the in-memory buffer with a O_TMPFILE file. This reduces the memory requirements for a partial pull since the tar-split data can be written to disk. Signed-off-by: Giuseppe Scrivano <[email protected]>
1 parent 1987f2a commit d856331

File tree

6 files changed

+114
-16
lines changed

6 files changed

+114
-16
lines changed

drivers/driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ type DriverWithDifferOutput struct {
216216
CompressedDigest digest.Digest
217217
Metadata string
218218
BigData map[string][]byte
219-
TarSplit []byte // nil if not available
219+
TarSplit *os.File // nil if not available
220220
TOCDigest digest.Digest
221221
// RootDirMode is the mode of the root directory of the layer, if specified.
222222
RootDirMode *os.FileMode

layers.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2550,10 +2550,14 @@ func (r *layerStore) applyDiffFromStagingDirectory(id string, diffOutput *driver
25502550
if err != nil {
25512551
compressor = pgzip.NewWriter(&tsdata)
25522552
}
2553+
if _, err := diffOutput.TarSplit.Seek(0, 0); err != nil {
2554+
return err
2555+
}
2556+
25532557
if err := compressor.SetConcurrency(1024*1024, 1); err != nil { // 1024*1024 is the hard-coded default; we're not changing that
25542558
logrus.Infof("setting compression concurrency threads to 1: %v; ignoring", err)
25552559
}
2556-
if _, err := compressor.Write(diffOutput.TarSplit); err != nil {
2560+
if _, err := diffOutput.TarSplit.WriteTo(compressor); err != nil {
25572561
compressor.Close()
25582562
return err
25592563
}

pkg/chunked/compression_linux.go

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ import (
66
"errors"
77
"fmt"
88
"io"
9+
"io/fs"
910
"maps"
11+
"math/rand"
12+
"os"
13+
"path/filepath"
1014
"slices"
1115
"strconv"
1216
"time"
@@ -18,6 +22,7 @@ import (
1822
"github.com/vbatts/tar-split/archive/tar"
1923
"github.com/vbatts/tar-split/tar/asm"
2024
"github.com/vbatts/tar-split/tar/storage"
25+
"golang.org/x/sys/unix"
2126
)
2227

2328
const (
@@ -157,10 +162,41 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64,
157162
return manifestUncompressed, tocOffset, nil
158163
}
159164

165+
func openTmpFile(tmpDir string) (fd int, err error) {
166+
fd, err = unix.Open(tmpDir, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC|unix.O_EXCL, 0o600)
167+
if err == nil {
168+
return fd, nil
169+
}
170+
return openTmpFileNoTmpFile(tmpDir)
171+
}
172+
173+
// openTmpFileNoTmpFile is a fallback used by openTmpFile when the underlying file system does not
174+
// support O_TMPFILE.
175+
func openTmpFileNoTmpFile(tmpDir string) (fd int, err error) {
176+
for i := 0; i < 100; i++ {
177+
name := fmt.Sprintf(".tmpfile-%d", rand.Int63())
178+
path := filepath.Join(tmpDir, name)
179+
180+
fd, err := unix.Open(path, unix.O_RDWR|unix.O_CREAT|unix.O_EXCL|unix.O_CLOEXEC, 0o600)
181+
if err == nil {
182+
// Unlink the file immediately so that only the open fd refers to it.
183+
_ = os.Remove(path)
184+
return fd, nil
185+
}
186+
if !errors.Is(err, os.ErrExist) {
187+
return -1, &fs.PathError{Op: "open", Path: tmpDir, Err: err}
188+
}
189+
}
190+
// report the original error if the fallback failed
191+
return -1, &fs.PathError{Op: "open O_TMPFILE", Path: tmpDir, Err: err}
192+
}
193+
160194
// readZstdChunkedManifest reads the zstd:chunked manifest from the seekable stream blobStream.
195+
// tmpDir is a directory where the tar-split temporary file is written to. The file is opened with
196+
// O_TMPFILE so that it is automatically removed when it is closed.
161197
// Returns (manifest blob, parsed manifest, tar-split blob or nil, manifest offset).
162198
// It may return an error matching ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert.
163-
func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) (_ []byte, _ *minimal.TOC, _ []byte, _ int64, retErr error) {
199+
func readZstdChunkedManifest(tmpDir string, blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) (_ []byte, _ *minimal.TOC, _ *os.File, _ int64, retErr error) {
164200
offsetMetadata := annotations[minimal.ManifestInfoKey]
165201
if offsetMetadata == "" {
166202
return nil, nil, nil, 0, fmt.Errorf("%q annotation missing", minimal.ManifestInfoKey)
@@ -245,7 +281,7 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di
245281
return nil, nil, nil, 0, fmt.Errorf("unmarshaling TOC: %w", err)
246282
}
247283

248-
var decodedTarSplit []byte = nil
284+
var decodedTarSplit *os.File
249285
if toc.TarSplitDigest != "" {
250286
if tarSplitChunk.Offset <= 0 {
251287
return nil, nil, nil, 0, fmt.Errorf("TOC requires a tar-split, but the %s annotation does not describe a position", minimal.TarSplitInfoKey)
@@ -254,14 +290,20 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di
254290
if err != nil {
255291
return nil, nil, nil, 0, err
256292
}
257-
decodedTarSplit, err = decodeAndValidateBlob(tarSplit, tarSplitLengthUncompressed, toc.TarSplitDigest.String())
293+
fd, err := openTmpFile(tmpDir)
258294
if err != nil {
295+
return nil, nil, nil, 0, err
296+
}
297+
decodedTarSplit = os.NewFile(uintptr(fd), "decoded-tar-split")
298+
if err := decodeAndValidateBlobToStream(tarSplit, decodedTarSplit, toc.TarSplitDigest.String()); err != nil {
299+
decodedTarSplit.Close()
259300
return nil, nil, nil, 0, fmt.Errorf("validating and decompressing tar-split: %w", err)
260301
}
261302
// We use the TOC for creating on-disk files, but the tar-split for creating metadata
262303
// when exporting the layer contents. Ensure the two match, otherwise local inspection of a container
263304
// might be misleading about the exported contents.
264305
if err := ensureTOCMatchesTarSplit(toc, decodedTarSplit); err != nil {
306+
decodedTarSplit.Close()
265307
return nil, nil, nil, 0, fmt.Errorf("tar-split and TOC data is inconsistent: %w", err)
266308
}
267309
} else if tarSplitChunk.Offset > 0 {
@@ -278,7 +320,7 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di
278320
}
279321

280322
// ensureTOCMatchesTarSplit validates that toc and tarSplit contain _exactly_ the same entries.
281-
func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error {
323+
func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit *os.File) error {
282324
pendingFiles := map[string]*minimal.FileMetadata{} // Name -> an entry in toc.Entries
283325
for i := range toc.Entries {
284326
e := &toc.Entries[i]
@@ -290,7 +332,11 @@ func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error {
290332
}
291333
}
292334

293-
unpacker := storage.NewJSONUnpacker(bytes.NewReader(tarSplit))
335+
if _, err := tarSplit.Seek(0, 0); err != nil {
336+
return err
337+
}
338+
339+
unpacker := storage.NewJSONUnpacker(tarSplit)
294340
if err := asm.IterateHeaders(unpacker, func(hdr *tar.Header) error {
295341
e, ok := pendingFiles[hdr.Name]
296342
if !ok {
@@ -320,10 +366,10 @@ func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error {
320366
}
321367

322368
// tarSizeFromTarSplit computes the total tarball size, using only the tarSplit metadata
323-
func tarSizeFromTarSplit(tarSplit []byte) (int64, error) {
369+
func tarSizeFromTarSplit(tarSplit io.Reader) (int64, error) {
324370
var res int64 = 0
325371

326-
unpacker := storage.NewJSONUnpacker(bytes.NewReader(tarSplit))
372+
unpacker := storage.NewJSONUnpacker(tarSplit)
327373
for {
328374
entry, err := unpacker.Next()
329375
if err != nil {
@@ -464,3 +510,18 @@ func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompr
464510
b := make([]byte, 0, lengthUncompressed)
465511
return decoder.DecodeAll(blob, b)
466512
}
513+
514+
func decodeAndValidateBlobToStream(blob []byte, w *os.File, expectedCompressedChecksum string) error {
515+
if err := validateBlob(blob, expectedCompressedChecksum); err != nil {
516+
return err
517+
}
518+
519+
decoder, err := zstd.NewReader(bytes.NewReader(blob)) //nolint:contextcheck
520+
if err != nil {
521+
return err
522+
}
523+
defer decoder.Close()
524+
525+
_, err = decoder.WriteTo(w)
526+
return err
527+
}

pkg/chunked/compression_linux_test.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@ package chunked
22

33
import (
44
"bytes"
5+
"fmt"
56
"io"
7+
"os"
68
"testing"
79

810
"github.com/stretchr/testify/assert"
911
"github.com/stretchr/testify/require"
1012
"github.com/vbatts/tar-split/archive/tar"
1113
"github.com/vbatts/tar-split/tar/asm"
1214
"github.com/vbatts/tar-split/tar/storage"
15+
"golang.org/x/sys/unix"
1316
)
1417

1518
func TestTarSizeFromTarSplit(t *testing.T) {
@@ -39,7 +42,27 @@ func TestTarSizeFromTarSplit(t *testing.T) {
3942
_, err = io.Copy(io.Discard, tsReader)
4043
require.NoError(t, err)
4144

42-
res, err := tarSizeFromTarSplit(tarSplit.Bytes())
45+
res, err := tarSizeFromTarSplit(&tarSplit)
4346
require.NoError(t, err)
4447
assert.Equal(t, expectedTarSize, res)
4548
}
49+
50+
func TestOpenTmpFile(t *testing.T) {
51+
for i := 0; i < 1000; i++ {
52+
// scope for cleanup
53+
f := func(fn func(tmpDir string) (int, error)) {
54+
fd, err := fn(t.TempDir())
55+
assert.NoError(t, err)
56+
defer unix.Close(fd)
57+
58+
path, err := os.Readlink(fmt.Sprintf("/proc/self/fd/%d", fd))
59+
assert.NoError(t, err)
60+
61+
// the path under /proc/self/fd/$FD has the prefix "(deleted)" when the file
62+
// is unlinked
63+
assert.Contains(t, path, "(deleted)")
64+
}
65+
f(openTmpFile)
66+
f(openTmpFileNoTmpFile)
67+
}
68+
}

pkg/chunked/storage_linux.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package chunked
22

33
import (
44
archivetar "archive/tar"
5-
"bytes"
65
"context"
76
"encoding/base64"
87
"errors"
@@ -89,7 +88,7 @@ type chunkedDiffer struct {
8988
tocOffset int64
9089
manifest []byte
9190
toc *minimal.TOC // The parsed contents of manifest, or nil if not yet available
92-
tarSplit []byte
91+
tarSplit *os.File
9392
uncompressedTarSize int64 // -1 if unknown
9493
// skipValidation is set to true if the individual files in
9594
// the layer are trusted and should not be validated.
@@ -194,6 +193,11 @@ func (c *chunkedDiffer) convertTarToZstdChunked(destDirectory string, payload *o
194193
}
195194

196195
func (c *chunkedDiffer) Close() error {
196+
if c.tarSplit != nil {
197+
err := c.tarSplit.Close()
198+
c.tarSplit = nil
199+
return err
200+
}
197201
return nil
198202
}
199203

@@ -337,13 +341,16 @@ func makeConvertFromRawDiffer(store storage.Store, blobDigest digest.Digest, blo
337341
// makeZstdChunkedDiffer sets up a chunkedDiffer for a zstd:chunked layer.
338342
// It may return an error matching ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert.
339343
func makeZstdChunkedDiffer(store storage.Store, blobSize int64, tocDigest digest.Digest, annotations map[string]string, iss ImageSourceSeekable, pullOptions pullOptions) (*chunkedDiffer, error) {
340-
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(iss, tocDigest, annotations)
344+
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(store.RunRoot(), iss, tocDigest, annotations)
341345
if err != nil { // May be ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert
342346
return nil, fmt.Errorf("read zstd:chunked manifest: %w", err)
343347
}
344348

345349
var uncompressedTarSize int64 = -1
346350
if tarSplit != nil {
351+
if _, err := tarSplit.Seek(0, 0); err != nil {
352+
return nil, err
353+
}
347354
uncompressedTarSize, err = tarSizeFromTarSplit(tarSplit)
348355
if err != nil {
349356
return nil, fmt.Errorf("computing size from tar-split: %w", err)
@@ -1439,7 +1446,7 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
14391446
if tocDigest == nil {
14401447
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("internal error: just-created zstd:chunked missing TOC digest")
14411448
}
1442-
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(fileSource, *tocDigest, annotations)
1449+
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(dest, fileSource, *tocDigest, annotations)
14431450
if err != nil {
14441451
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("read zstd:chunked manifest: %w", err)
14451452
}
@@ -1846,7 +1853,10 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
18461853
case c.pullOptions.insecureAllowUnpredictableImageContents:
18471854
// Oh well. Skip the costly digest computation.
18481855
case output.TarSplit != nil:
1849-
metadata := tsStorage.NewJSONUnpacker(bytes.NewReader(output.TarSplit))
1856+
if _, err := output.TarSplit.Seek(0, 0); err != nil {
1857+
return output, err
1858+
}
1859+
metadata := tsStorage.NewJSONUnpacker(output.TarSplit)
18501860
fg := newStagedFileGetter(dirFile, flatPathNameMap)
18511861
digester := digest.Canonical.Digester()
18521862
if err := asm.WriteOutputTarStream(fg, metadata, digester.Hash()); err != nil {

pkg/chunked/zstdchunked_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func TestGenerateAndParseManifest(t *testing.T) {
179179
tocDigest, err := toc.GetTOCDigest(annotations)
180180
require.NoError(t, err)
181181
require.NotNil(t, tocDigest)
182-
manifest, decodedTOC, _, _, err := readZstdChunkedManifest(s, *tocDigest, annotations)
182+
manifest, decodedTOC, _, _, err := readZstdChunkedManifest(t.TempDir(), s, *tocDigest, annotations)
183183
require.NoError(t, err)
184184

185185
var toc minimal.TOC

0 commit comments

Comments
 (0)