Skip to content

[compression] NewZstdCompressingWriter #9521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
4 changes: 2 additions & 2 deletions server/util/bytebufferpool/bytebufferpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ type VariableWriteBufPool struct {
maxBufferSize int
}

// NewVariableWriteBufPool returns a byte buffer pool that can be used when the
// buffer size is not fixed. It internally maintains pools of different
// NewVariableWriteBufPool returns a pool of bufio.Writers.
// It internally maintains pools of different
// bufio.WriteBuffers in different sizes to accomadate the requested size.
func NewVariableWriteBufPool(maxBufferSize int) *VariableWriteBufPool {
bp := &VariableWriteBufPool{}
Expand Down
3 changes: 3 additions & 0 deletions server/util/compression/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ go_library(
importpath = "github.com/buildbuddy-io/buildbuddy/server/util/compression",
visibility = ["//visibility:public"],
deps = [
"//server/interfaces",
"//server/metrics",
"//server/util/bytebufferpool",
"//server/util/ioutil",
"//server/util/log",
"@com_github_klauspost_compress//zstd",
"@com_github_prometheus_client_golang//prometheus",
Expand Down
74 changes: 74 additions & 0 deletions server/util/compression/compression.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package compression

import (
"bufio"
"errors"
"io"
"runtime"
"sync"

"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/metrics"
"github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool"
"github.com/buildbuddy-io/buildbuddy/server/util/ioutil"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus"
)

const compressChunkSize = 4 * 1024 * 1024 // 4MB

var (
// zstdEncoder can be shared across goroutines to compress chunks of data
// using EncodeAll. Streaming functions such as encoder.ReadFrom or io.Copy
Expand All @@ -22,6 +28,13 @@ var (
// either for streaming decompression using ReadFrom or batch decompression
// using DecodeAll. The returned decoders *must not* be closed.
zstdDecoderPool = NewZstdDecoderPool()

compressBufPool = bytebufferpool.FixedSize(compressChunkSize)
writerPool = sync.Pool{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe call this bufWriterPool

New: func() any {
return bufio.NewWriterSize(io.Discard, compressChunkSize)
},
}
)

func mustGetZstdEncoder() *zstd.Encoder {
Expand Down Expand Up @@ -177,6 +190,67 @@ func NewZstdCompressingReader(reader io.ReadCloser, readBuf []byte, compressBuf
}, nil
}

type compressingWriter struct {
w io.Writer
compressBuf []byte
poolCompressBuf []byte
}

func (c *compressingWriter) Write(p []byte) (int, error) {
var totalWritten int
for len(p) > 0 {
chunkSize := min(len(p), cap(c.compressBuf))
chunk := p[:chunkSize]
c.compressBuf = CompressZstd(c.compressBuf[:0], chunk)

written, err := c.w.Write(c.compressBuf)
if err != nil {
return totalWritten, err
}
if written < len(c.compressBuf) {
return totalWritten, io.ErrShortWrite
}

totalWritten += chunkSize
p = p[chunkSize:]
}
return totalWritten, nil
}

// Close puts the compression buffer back into the pool.
// If the underlying Writer is also a Closer, it closes it.
func (c *compressingWriter) Close() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write and Close should probably both fail if Close was already called. At the very least, if Close was already called and we call Close again, we shouldn't put the compress buffer in the pool again.

compressBufPool.Put(c.poolCompressBuf)
if closer, ok := c.w.(io.Closer); ok {
return closer.Close()
}
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also close c.w?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess so? I guess NewZstdCompressingWriter takes a Writer and returns a WriteCloser, and it would be more consistent if it took a WriteCloser as input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nvm, I didn't realize you're accepting a writer. In that case, it's ok for the caller to be responsible for closing the writer, if it needs to be closed.

On the other hand, this makes it harder for the caller if they need to return the writer further up the stream. Maybe you can just add

if closer, ok := c.w.(io.Closer); ok {
  return closer.Close()
}

And then document that if the passed in writer is a Closer, it will be closed when the returned writer is closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I like it!

}

// NewZstdCompressingWriter returns a writer that compresses each chunk of the
// input using zstd and writes the compressed data to the underlying writer.
// The writer uses a fixed-size 4MB buffer for compression.
Copy link
Member

@bduffany bduffany Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My interpretation of this is that if I do several writes of 1 byte each, they will be buffered up to 4MB and then that 4MB chunk will be compressed in a single batch, but the current impl is that each call to Write() would create a separate compression frame for each byte, which would be inefficient.

Copy link
Member

@bduffany bduffany Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we might want to have a ZstdEncoderPool (similar to ZstdDecoderPool), where each encoder has the max concurrency option set to 1, and then pool the encoders, instead of pooling byte buffers. That way, we let the zstd library optimally handle compression frame allocation for us (based on the compression level), instead of introducing our own 4MB compression window on top of the compression window that zstd is already using internally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bduffany I changed the implementation to use a bufio.Writer, so all writes up to be 4MB will in fact be buffered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sort of mentioned this in my follow-up comment (not sure if you saw it) but it seems better to pool the decoders and let the compression library handle buffering, instead of implementing this buffering ourselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, gotcha! Yes, that'd be pretty sweet.

func NewZstdCompressingWriter(w io.Writer) interfaces.CommittedWriteCloser {
compressBuf := compressBufPool.Get()
compressor := &compressingWriter{
w: w,
compressBuf: compressBuf,
poolCompressBuf: compressBuf,
}
bw := writerPool.Get().(*bufio.Writer)
bw.Reset(compressor)
cwc := ioutil.NewCustomCommitWriteCloser(bw)
cwc.CommitFn = func(_ int64) error {
return bw.Flush()
}
cwc.CloseFn = func() error {
err := compressor.Close()
writerPool.Put(bw)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing here about not adding to the pool twice if close is called twice. If you add a compressingWriter.closed field, you could just check that.

return err
}
return cwc
}

// NewZstdDecompressingReader reads zstd-compressed data from the input
// reader and makes the decompressed data available on the output reader. The
// output reader is also an io.WriterTo, which can often prevent allocations
Expand Down
44 changes: 38 additions & 6 deletions server/util/compression/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"io"
"math"
"strconv"
"testing"

Expand Down Expand Up @@ -50,18 +49,39 @@ func TestLossless(t *testing.T) {
compress: compressWithNewZstdCompressingReader,
decompress: decompressWithNewZstdDecompressingReader,
},
{
name: "NewZstdCompressingWriter -> DecompressZstd",
compress: compressWithNewZstdCompressingWriter,
decompress: decompressWithDecompressZstd,
},
{
name: "NewZstdCompressingWriter -> NewZstdDecompressor",
compress: compressWithNewZstdCompressingWriter,
decompress: decompressWithNewZstdDecompressor,
},
{
name: "NewZstdCompressingWriter -> NewZstdDecompressingReader",
compress: compressWithNewZstdCompressingWriter,
decompress: decompressWithNewZstdDecompressingReader,
},
} {
for i := 1; i <= 5; i++ {
srclen := int(math.Pow10(i))
for _, srclen := range []int{9, 99, 999, 1_999_999, 5_999_999} {
name := tc.name + "_" + strconv.Itoa(srclen) + "_bytes"
t.Run(name, func(t *testing.T) {
_, r := testdigest.NewReader(t, int64(srclen))
src, err := io.ReadAll(r)
require.NoError(t, err)
require.Len(t, src, srclen)
require.Equal(t, srclen, len(src))
compressed := tc.compress(t, src)

decompressed := tc.decompress(t, len(src), compressed)
require.Len(t, decompressed, srclen)
if srclen > 1000 {
require.Empty(t, cmp.Diff(src[:1000], decompressed[:1000]))
require.Empty(t, cmp.Diff(src[len(src)-1000:], decompressed[len(decompressed)-1000:]))
return
}
require.Empty(t, cmp.Diff(src, decompressed))
})
}
Expand All @@ -88,6 +108,19 @@ func compressWithNewZstdCompressingReader(t *testing.T, src []byte) []byte {
return compressed
}

func compressWithNewZstdCompressingWriter(t *testing.T, src []byte) []byte {
compressed := &bytes.Buffer{}
cw := compression.NewZstdCompressingWriter(compressed)
written, err := cw.Write(src)
require.NoError(t, err)
require.Equal(t, len(src), written)
err = cw.Commit()
require.NoError(t, err)
err = cw.Close()
require.NoError(t, err)
return compressed.Bytes()
}

func decompressWithDecompressZstd(t *testing.T, srclen int, compressed []byte) []byte {
decompressed := make([]byte, srclen)
decompressed, err := compression.DecompressZstd(decompressed, compressed)
Expand All @@ -111,10 +144,9 @@ func decompressWithNewZstdDecompressingReader(t *testing.T, srclen int, compress
rc := io.NopCloser(bytes.NewReader(compressed))
d, err := compression.NewZstdDecompressingReader(rc)
require.NoError(t, err)
buf := make([]byte, srclen)
n, err := d.Read(buf)
buf, err := io.ReadAll(d)
require.NoError(t, err)
require.Equal(t, srclen, n)
require.Len(t, buf, srclen)
err = d.Close()
require.NoError(t, err)
err = rc.Close()
Expand Down
Loading