From 51a4066adbc03d266b366b00103985619423c1e9 Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Fri, 30 May 2025 14:41:05 -0500 Subject: [PATCH 01/12] [compression] NewZstdCompressingWriter --- server/util/compression/BUILD | 1 + server/util/compression/compression.go | 49 +++++++++++++++++++++ server/util/compression/compression_test.go | 42 +++++++++++++++--- 3 files changed, 86 insertions(+), 6 deletions(-) diff --git a/server/util/compression/BUILD b/server/util/compression/BUILD index 79742e4a6de..b5baa190597 100644 --- a/server/util/compression/BUILD +++ b/server/util/compression/BUILD @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//server/metrics", + "//server/util/bytebufferpool", "//server/util/log", "@com_github_klauspost_compress//zstd", "@com_github_prometheus_client_golang//prometheus", diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index 0003bbe1bb9..c8c542e4cd3 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -7,11 +7,14 @@ import ( "sync" "github.com/buildbuddy-io/buildbuddy/server/metrics" + "github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool" "github.com/buildbuddy-io/buildbuddy/server/util/log" "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" ) +const compressChunkSize = 4 * 1024 * 1024 // 4MB + var ( // zstdEncoder can be shared across goroutines to compress chunks of data // using EncodeAll. Streaming functions such as encoder.ReadFrom or io.Copy @@ -22,6 +25,8 @@ var ( // either for streaming decompression using ReadFrom or batch decompression // using DecodeAll. The returned decoders *must not* be closed. zstdDecoderPool = NewZstdDecoderPool() + + compressBufPool = bytebufferpool.FixedSize(compressChunkSize) ) func mustGetZstdEncoder() *zstd.Encoder { @@ -177,6 +182,50 @@ func NewZstdCompressingReader(reader io.ReadCloser, readBuf []byte, compressBuf }, nil } +type compressingWriter struct { + w io.Writer + compressBuf []byte + poolCompressBuf []byte +} + +func (c *compressingWriter) Write(p []byte) (int, error) { + var totalWritten int + for len(p) > 0 { + chunkSize := min(len(p), cap(c.compressBuf)) + chunk := p[:chunkSize] + c.compressBuf = CompressZstd(c.compressBuf[:0], chunk) + + written, err := c.w.Write(c.compressBuf) + if err != nil { + return totalWritten, err + } + if written < len(c.compressBuf) { + return totalWritten, io.ErrShortWrite + } + + totalWritten += chunkSize + p = p[chunkSize:] + } + return totalWritten, nil +} + +func (c *compressingWriter) Close() error { + compressBufPool.Put(c.poolCompressBuf) + return nil +} + +// NewZstdCompressingWriter returns a writer that compresses each chunk of the +// input using zstd and writes the compressed data to the underlying writer. +// The writer uses a fixed-size 4MB buffer for compression. +func NewZstdCompressingWriter(w io.Writer) io.WriteCloser { + compressBuf := compressBufPool.Get() + return &compressingWriter{ + w: w, + compressBuf: compressBuf, + poolCompressBuf: compressBuf, + } +} + // NewZstdDecompressingReader reads zstd-compressed data from the input // reader and makes the decompressed data available on the output reader. The // output reader is also an io.WriterTo, which can often prevent allocations diff --git a/server/util/compression/compression_test.go b/server/util/compression/compression_test.go index 1c68219484c..390b748e399 100644 --- a/server/util/compression/compression_test.go +++ b/server/util/compression/compression_test.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "io" - "math" "strconv" "testing" @@ -50,18 +49,39 @@ func TestLossless(t *testing.T) { compress: compressWithNewZstdCompressingReader, decompress: decompressWithNewZstdDecompressingReader, }, + { + name: "NewZstdCompressingWriter -> DecompressZstd", + compress: compressWithNewZstdCompressingWriter, + decompress: decompressWithDecompressZstd, + }, + { + name: "NewZstdCompressingWriter -> NewZstdDecompressor", + compress: compressWithNewZstdCompressingWriter, + decompress: decompressWithNewZstdDecompressor, + }, + { + name: "NewZstdCompressingWriter -> NewZstdDecompressingReader", + compress: compressWithNewZstdCompressingWriter, + decompress: decompressWithNewZstdDecompressingReader, + }, } { - for i := 1; i <= 5; i++ { - srclen := int(math.Pow10(i)) + for _, srclen := range []int{9, 99, 999, 1_999_999, 5_999_999} { name := tc.name + "_" + strconv.Itoa(srclen) + "_bytes" t.Run(name, func(t *testing.T) { _, r := testdigest.NewReader(t, int64(srclen)) src, err := io.ReadAll(r) require.NoError(t, err) + require.Len(t, src, srclen) require.Equal(t, srclen, len(src)) compressed := tc.compress(t, src) decompressed := tc.decompress(t, len(src), compressed) + require.Len(t, decompressed, srclen) + if srclen > 1000 { + require.Empty(t, cmp.Diff(src[:1000], decompressed[:1000])) + require.Empty(t, cmp.Diff(src[len(src)-1000:], decompressed[len(decompressed)-1000:])) + return + } require.Empty(t, cmp.Diff(src, decompressed)) }) } @@ -88,6 +108,17 @@ func compressWithNewZstdCompressingReader(t *testing.T, src []byte) []byte { return compressed } +func compressWithNewZstdCompressingWriter(t *testing.T, src []byte) []byte { + compressed := &bytes.Buffer{} + cw := compression.NewZstdCompressingWriter(compressed) + written, err := cw.Write(src) + require.NoError(t, err) + require.Equal(t, len(src), written) + err = cw.Close() + require.NoError(t, err) + return compressed.Bytes() +} + func decompressWithDecompressZstd(t *testing.T, srclen int, compressed []byte) []byte { decompressed := make([]byte, srclen) decompressed, err := compression.DecompressZstd(decompressed, compressed) @@ -111,10 +142,9 @@ func decompressWithNewZstdDecompressingReader(t *testing.T, srclen int, compress rc := io.NopCloser(bytes.NewReader(compressed)) d, err := compression.NewZstdDecompressingReader(rc) require.NoError(t, err) - buf := make([]byte, srclen) - n, err := d.Read(buf) + buf, err := io.ReadAll(d) require.NoError(t, err) - require.Equal(t, srclen, n) + require.Len(t, buf, srclen) err = d.Close() require.NoError(t, err) err = rc.Close() From 03f7b601c14ccb1a9112f11a3146fe1b5df27a60 Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Fri, 30 May 2025 15:52:13 -0500 Subject: [PATCH 02/12] Close underlying WriteCloser --- server/util/compression/compression.go | 6 +++--- server/util/compression/compression_test.go | 10 +++++++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index c8c542e4cd3..db088d83f6d 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -183,7 +183,7 @@ func NewZstdCompressingReader(reader io.ReadCloser, readBuf []byte, compressBuf } type compressingWriter struct { - w io.Writer + w io.WriteCloser compressBuf []byte poolCompressBuf []byte } @@ -211,13 +211,13 @@ func (c *compressingWriter) Write(p []byte) (int, error) { func (c *compressingWriter) Close() error { compressBufPool.Put(c.poolCompressBuf) - return nil + return c.w.Close() } // NewZstdCompressingWriter returns a writer that compresses each chunk of the // input using zstd and writes the compressed data to the underlying writer. // The writer uses a fixed-size 4MB buffer for compression. -func NewZstdCompressingWriter(w io.Writer) io.WriteCloser { +func NewZstdCompressingWriter(w io.WriteCloser) io.WriteCloser { compressBuf := compressBufPool.Get() return &compressingWriter{ w: w, diff --git a/server/util/compression/compression_test.go b/server/util/compression/compression_test.go index 390b748e399..96d3ce811a0 100644 --- a/server/util/compression/compression_test.go +++ b/server/util/compression/compression_test.go @@ -108,9 +108,17 @@ func compressWithNewZstdCompressingReader(t *testing.T, src []byte) []byte { return compressed } +type nopWriteCloser struct { + io.Writer +} + +func (w *nopWriteCloser) Close() error { + return nil +} + func compressWithNewZstdCompressingWriter(t *testing.T, src []byte) []byte { compressed := &bytes.Buffer{} - cw := compression.NewZstdCompressingWriter(compressed) + cw := compression.NewZstdCompressingWriter(&nopWriteCloser{compressed}) written, err := cw.Write(src) require.NoError(t, err) require.Equal(t, len(src), written) From b4723326d0a511d7c184a729a50100ce7346d4b0 Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Fri, 30 May 2025 15:53:35 -0500 Subject: [PATCH 03/12] Revert "Close underlying WriteCloser" This reverts commit 03f7b601c14ccb1a9112f11a3146fe1b5df27a60. --- server/util/compression/compression.go | 6 +++--- server/util/compression/compression_test.go | 10 +--------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index db088d83f6d..c8c542e4cd3 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -183,7 +183,7 @@ func NewZstdCompressingReader(reader io.ReadCloser, readBuf []byte, compressBuf } type compressingWriter struct { - w io.WriteCloser + w io.Writer compressBuf []byte poolCompressBuf []byte } @@ -211,13 +211,13 @@ func (c *compressingWriter) Write(p []byte) (int, error) { func (c *compressingWriter) Close() error { compressBufPool.Put(c.poolCompressBuf) - return c.w.Close() + return nil } // NewZstdCompressingWriter returns a writer that compresses each chunk of the // input using zstd and writes the compressed data to the underlying writer. // The writer uses a fixed-size 4MB buffer for compression. -func NewZstdCompressingWriter(w io.WriteCloser) io.WriteCloser { +func NewZstdCompressingWriter(w io.Writer) io.WriteCloser { compressBuf := compressBufPool.Get() return &compressingWriter{ w: w, diff --git a/server/util/compression/compression_test.go b/server/util/compression/compression_test.go index 96d3ce811a0..390b748e399 100644 --- a/server/util/compression/compression_test.go +++ b/server/util/compression/compression_test.go @@ -108,17 +108,9 @@ func compressWithNewZstdCompressingReader(t *testing.T, src []byte) []byte { return compressed } -type nopWriteCloser struct { - io.Writer -} - -func (w *nopWriteCloser) Close() error { - return nil -} - func compressWithNewZstdCompressingWriter(t *testing.T, src []byte) []byte { compressed := &bytes.Buffer{} - cw := compression.NewZstdCompressingWriter(&nopWriteCloser{compressed}) + cw := compression.NewZstdCompressingWriter(compressed) written, err := cw.Write(src) require.NoError(t, err) require.Equal(t, len(src), written) From 99548a642abb4714b2db56c54f6e608db9403c73 Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Fri, 30 May 2025 15:55:31 -0500 Subject: [PATCH 04/12] close underlying Writer if it is also a Closer --- server/util/compression/compression.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index c8c542e4cd3..1097a11b28e 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -209,8 +209,13 @@ func (c *compressingWriter) Write(p []byte) (int, error) { return totalWritten, nil } +// Close puts the compression buffer back into the pool. +// If the underlying Writer is also a Closer, it closes it. func (c *compressingWriter) Close() error { compressBufPool.Put(c.poolCompressBuf) + if closer, ok := c.w.(io.Closer); ok { + return closer.Close() + } return nil } From e8375c09d30ceb1791edbab45737339317826415 Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Mon, 2 Jun 2025 11:25:46 -0500 Subject: [PATCH 05/12] buffer writes to compressingWriter --- server/util/bytebufferpool/bytebufferpool.go | 4 ++-- server/util/compression/BUILD | 1 + server/util/compression/compression.go | 22 ++++++++++++++++++-- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/server/util/bytebufferpool/bytebufferpool.go b/server/util/bytebufferpool/bytebufferpool.go index ca378b33c18..5e079cfa2c4 100644 --- a/server/util/bytebufferpool/bytebufferpool.go +++ b/server/util/bytebufferpool/bytebufferpool.go @@ -112,8 +112,8 @@ type VariableWriteBufPool struct { maxBufferSize int } -// NewVariableWriteBufPool returns a byte buffer pool that can be used when the -// buffer size is not fixed. It internally maintains pools of different +// NewVariableWriteBufPool returns a pool of bufio.Writers. +// It internally maintains pools of different // bufio.WriteBuffers in different sizes to accomadate the requested size. func NewVariableWriteBufPool(maxBufferSize int) *VariableWriteBufPool { bp := &VariableWriteBufPool{} diff --git a/server/util/compression/BUILD b/server/util/compression/BUILD index b5baa190597..bb7162c93b2 100644 --- a/server/util/compression/BUILD +++ b/server/util/compression/BUILD @@ -8,6 +8,7 @@ go_library( deps = [ "//server/metrics", "//server/util/bytebufferpool", + "//server/util/ioutil", "//server/util/log", "@com_github_klauspost_compress//zstd", "@com_github_prometheus_client_golang//prometheus", diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index 1097a11b28e..df98c645e0b 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -1,13 +1,16 @@ package compression import ( + "bufio" "errors" "io" "runtime" "sync" + "github.com/buildbuddy-io/buildbuddy/bazel-buildbuddy/server/interfaces" "github.com/buildbuddy-io/buildbuddy/server/metrics" "github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool" + "github.com/buildbuddy-io/buildbuddy/server/util/ioutil" "github.com/buildbuddy-io/buildbuddy/server/util/log" "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" @@ -27,6 +30,11 @@ var ( zstdDecoderPool = NewZstdDecoderPool() compressBufPool = bytebufferpool.FixedSize(compressChunkSize) + writerPool = sync.Pool{ + New: func() any { + return bufio.NewWriterSize(io.Discard, compressChunkSize) + }, + } ) func mustGetZstdEncoder() *zstd.Encoder { @@ -222,13 +230,23 @@ func (c *compressingWriter) Close() error { // NewZstdCompressingWriter returns a writer that compresses each chunk of the // input using zstd and writes the compressed data to the underlying writer. // The writer uses a fixed-size 4MB buffer for compression. -func NewZstdCompressingWriter(w io.Writer) io.WriteCloser { +func NewZstdCompressingWriter(w io.Writer) interfaces.CommittedWriteCloser { compressBuf := compressBufPool.Get() - return &compressingWriter{ + compressor := &compressingWriter{ w: w, compressBuf: compressBuf, poolCompressBuf: compressBuf, } + bw := writerPool.Get().(*bufio.Writer) + bw.Reset(compressor) + cwc := ioutil.NewCustomCommitWriteCloser(bw) + cwc.CommitFn = func(_ int64) error { + return bw.Flush() + } + cwc.CloseFn = func() error { + return compressor.Close() + } + return cwc } // NewZstdDecompressingReader reads zstd-compressed data from the input From ebc0c09d4499019e73ff10fc08c2233399436d71 Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Mon, 2 Jun 2025 11:27:44 -0500 Subject: [PATCH 06/12] buildfix --- server/util/compression/BUILD | 1 + server/util/compression/compression.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/util/compression/BUILD b/server/util/compression/BUILD index bb7162c93b2..cfc15fc19cf 100644 --- a/server/util/compression/BUILD +++ b/server/util/compression/BUILD @@ -6,6 +6,7 @@ go_library( importpath = "github.com/buildbuddy-io/buildbuddy/server/util/compression", visibility = ["//visibility:public"], deps = [ + "//server/interfaces", "//server/metrics", "//server/util/bytebufferpool", "//server/util/ioutil", diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index df98c645e0b..397fd2b8360 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -7,7 +7,7 @@ import ( "runtime" "sync" - "github.com/buildbuddy-io/buildbuddy/bazel-buildbuddy/server/interfaces" + "github.com/buildbuddy-io/buildbuddy/server/interfaces" "github.com/buildbuddy-io/buildbuddy/server/metrics" "github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool" "github.com/buildbuddy-io/buildbuddy/server/util/ioutil" From 09a568c12c3eedcf57b6df2c5bfb6948df68e9bd Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Mon, 2 Jun 2025 11:29:46 -0500 Subject: [PATCH 07/12] commit in tests --- server/util/compression/compression_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/util/compression/compression_test.go b/server/util/compression/compression_test.go index 390b748e399..de7e126fe49 100644 --- a/server/util/compression/compression_test.go +++ b/server/util/compression/compression_test.go @@ -114,6 +114,8 @@ func compressWithNewZstdCompressingWriter(t *testing.T, src []byte) []byte { written, err := cw.Write(src) require.NoError(t, err) require.Equal(t, len(src), written) + err = cw.Commit() + require.NoError(t, err) err = cw.Close() require.NoError(t, err) return compressed.Bytes() From cf73c1f60a6ac1842e9477311af1e452477b138f Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Mon, 2 Jun 2025 11:38:25 -0500 Subject: [PATCH 08/12] put writer back in pool --- server/util/compression/compression.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index 397fd2b8360..e2b1e63222a 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -244,7 +244,9 @@ func NewZstdCompressingWriter(w io.Writer) interfaces.CommittedWriteCloser { return bw.Flush() } cwc.CloseFn = func() error { - return compressor.Close() + err := compressor.Close() + writerPool.Put(bw) + return err } return cwc } From 11b3efdc19073ee1e3aed3233e3e0ed58b2a5f82 Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Mon, 2 Jun 2025 14:51:07 -0500 Subject: [PATCH 09/12] bufWriterPool --- server/util/compression/compression.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index e2b1e63222a..7213e06c7bc 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -30,7 +30,7 @@ var ( zstdDecoderPool = NewZstdDecoderPool() compressBufPool = bytebufferpool.FixedSize(compressChunkSize) - writerPool = sync.Pool{ + bufWriterPool = sync.Pool{ New: func() any { return bufio.NewWriterSize(io.Discard, compressChunkSize) }, @@ -237,7 +237,7 @@ func NewZstdCompressingWriter(w io.Writer) interfaces.CommittedWriteCloser { compressBuf: compressBuf, poolCompressBuf: compressBuf, } - bw := writerPool.Get().(*bufio.Writer) + bw := bufWriterPool.Get().(*bufio.Writer) bw.Reset(compressor) cwc := ioutil.NewCustomCommitWriteCloser(bw) cwc.CommitFn = func(_ int64) error { @@ -245,7 +245,7 @@ func NewZstdCompressingWriter(w io.Writer) interfaces.CommittedWriteCloser { } cwc.CloseFn = func() error { err := compressor.Close() - writerPool.Put(bw) + bufWriterPool.Put(bw) return err } return cwc From e0fba1528accd3413961a643623aaf488517ed60 Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Mon, 2 Jun 2025 15:31:29 -0500 Subject: [PATCH 10/12] pooledBufWriter --- server/util/compression/compression.go | 65 +++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index 7213e06c7bc..5c777bc8974 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -7,6 +7,7 @@ import ( "runtime" "sync" + "github.com/buildbuddy-io/buildbuddy/bazel-buildbuddy/server/util/status" "github.com/buildbuddy-io/buildbuddy/server/interfaces" "github.com/buildbuddy-io/buildbuddy/server/metrics" "github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool" @@ -227,6 +228,57 @@ func (c *compressingWriter) Close() error { return nil } +type pooledBufWriter struct { + w *bufio.Writer + + committed bool + closed bool +} + +func (b *pooledBufWriter) Write(p []byte) (int, error) { + if b.closed { + return status.FailedPreconditionError("pooledBufWriter already closed, cannot receive writes") + } + if b.committed { + return status.FailedPreconditionError("pooledBufWriter already committed, cannot receive writes") + } + return b.w.Write(p) +} + +func (b *pooledBufWriter) ReadFrom(r io.Reader) (int64, error) { + if b.closed { + return status.FailedPreconditionError("pooledBufWriter already closed, cannot ReadFrom") + } + if b.committed { + return status.FailedPreconditionError("pooledBufWriter already committed, cannot ReadFrom") + } + return b.w.ReadFrom(r) +} + +// Commit flushes any buffered bytes. +// It is an error to call Commit after Close. +func (b *pooledBufWriter) Commit() error { + if b.closed { + return status.FailedPreconditionError("pooledBufWriter already closed, cannot commit") + } + if b.committed { + return nil + } + b.committed = true + return b.w.Flush() +} + +// Close puts the bufio.Writer back into the pool. +// It is an error to call Close more than once. +func (b *pooledBufWriter) Close() error { + if b.closed { + return status.FailedPreconditionError("pooledBufWriter already closed, cannot close again") + } + b.closed = true + bufWriterPool.Put(b.w) + return nil +} + // NewZstdCompressingWriter returns a writer that compresses each chunk of the // input using zstd and writes the compressed data to the underlying writer. // The writer uses a fixed-size 4MB buffer for compression. @@ -239,14 +291,13 @@ func NewZstdCompressingWriter(w io.Writer) interfaces.CommittedWriteCloser { } bw := bufWriterPool.Get().(*bufio.Writer) bw.Reset(compressor) - cwc := ioutil.NewCustomCommitWriteCloser(bw) - cwc.CommitFn = func(_ int64) error { - return bw.Flush() - } + cwc := ioutil.NewCustomCommitWriteCloser( + &pooledBufWriter{ + w: bw, + }, + ) cwc.CloseFn = func() error { - err := compressor.Close() - bufWriterPool.Put(bw) - return err + return compressor.Close() } return cwc } From a4ff2cb83c1b7b013edfffd0236b0fa59b0d4c5d Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Mon, 2 Jun 2025 15:32:52 -0500 Subject: [PATCH 11/12] Revert "pooledBufWriter" This reverts commit e0fba1528accd3413961a643623aaf488517ed60. --- server/util/compression/compression.go | 65 +++----------------------- 1 file changed, 7 insertions(+), 58 deletions(-) diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index 5c777bc8974..7213e06c7bc 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -7,7 +7,6 @@ import ( "runtime" "sync" - "github.com/buildbuddy-io/buildbuddy/bazel-buildbuddy/server/util/status" "github.com/buildbuddy-io/buildbuddy/server/interfaces" "github.com/buildbuddy-io/buildbuddy/server/metrics" "github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool" @@ -228,57 +227,6 @@ func (c *compressingWriter) Close() error { return nil } -type pooledBufWriter struct { - w *bufio.Writer - - committed bool - closed bool -} - -func (b *pooledBufWriter) Write(p []byte) (int, error) { - if b.closed { - return status.FailedPreconditionError("pooledBufWriter already closed, cannot receive writes") - } - if b.committed { - return status.FailedPreconditionError("pooledBufWriter already committed, cannot receive writes") - } - return b.w.Write(p) -} - -func (b *pooledBufWriter) ReadFrom(r io.Reader) (int64, error) { - if b.closed { - return status.FailedPreconditionError("pooledBufWriter already closed, cannot ReadFrom") - } - if b.committed { - return status.FailedPreconditionError("pooledBufWriter already committed, cannot ReadFrom") - } - return b.w.ReadFrom(r) -} - -// Commit flushes any buffered bytes. -// It is an error to call Commit after Close. -func (b *pooledBufWriter) Commit() error { - if b.closed { - return status.FailedPreconditionError("pooledBufWriter already closed, cannot commit") - } - if b.committed { - return nil - } - b.committed = true - return b.w.Flush() -} - -// Close puts the bufio.Writer back into the pool. -// It is an error to call Close more than once. -func (b *pooledBufWriter) Close() error { - if b.closed { - return status.FailedPreconditionError("pooledBufWriter already closed, cannot close again") - } - b.closed = true - bufWriterPool.Put(b.w) - return nil -} - // NewZstdCompressingWriter returns a writer that compresses each chunk of the // input using zstd and writes the compressed data to the underlying writer. // The writer uses a fixed-size 4MB buffer for compression. @@ -291,13 +239,14 @@ func NewZstdCompressingWriter(w io.Writer) interfaces.CommittedWriteCloser { } bw := bufWriterPool.Get().(*bufio.Writer) bw.Reset(compressor) - cwc := ioutil.NewCustomCommitWriteCloser( - &pooledBufWriter{ - w: bw, - }, - ) + cwc := ioutil.NewCustomCommitWriteCloser(bw) + cwc.CommitFn = func(_ int64) error { + return bw.Flush() + } cwc.CloseFn = func() error { - return compressor.Close() + err := compressor.Close() + bufWriterPool.Put(bw) + return err } return cwc } From a789d29495d6db1e31d6a294431886cc489f905d Mon Sep 17 00:00:00 2001 From: Dan Stowell Date: Mon, 2 Jun 2025 15:36:31 -0500 Subject: [PATCH 12/12] compressingWriter.closed --- server/util/compression/compression.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index 7213e06c7bc..5cec070f9ca 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -7,11 +7,13 @@ import ( "runtime" "sync" + "github.com/buildbuddy-io/buildbuddy/bazel-buildbuddy/server/util/status" "github.com/buildbuddy-io/buildbuddy/server/interfaces" "github.com/buildbuddy-io/buildbuddy/server/metrics" "github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool" "github.com/buildbuddy-io/buildbuddy/server/util/ioutil" "github.com/buildbuddy-io/buildbuddy/server/util/log" + "github.com/buildbuddy-io/buildbuddy/server/util/status" "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" ) @@ -194,10 +196,14 @@ type compressingWriter struct { w io.Writer compressBuf []byte poolCompressBuf []byte + closed bool } func (c *compressingWriter) Write(p []byte) (int, error) { - var totalWritten int + if c.closed { + return 0, status.FailedPreconditionError("compressingWriter already closed, cannot receive writes") + } + totalWritten := 0 for len(p) > 0 { chunkSize := min(len(p), cap(c.compressBuf)) chunk := p[:chunkSize] @@ -220,6 +226,10 @@ func (c *compressingWriter) Write(p []byte) (int, error) { // Close puts the compression buffer back into the pool. // If the underlying Writer is also a Closer, it closes it. func (c *compressingWriter) Close() error { + if c.closed { + return status.FailedPreconditionError("compressingWriter already closed, cannot close again") + } + c.closed = true compressBufPool.Put(c.poolCompressBuf) if closer, ok := c.w.(io.Closer); ok { return closer.Close() @@ -244,6 +254,9 @@ func NewZstdCompressingWriter(w io.Writer) interfaces.CommittedWriteCloser { return bw.Flush() } cwc.CloseFn = func() error { + if compressor.closed { + return status.FailedPreconditionError("Writer already closed, cannot close again") + } err := compressor.Close() bufWriterPool.Put(bw) return err