Skip to content

[compression] NewZstdCompressingWriter #9521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
4 changes: 2 additions & 2 deletions server/util/bytebufferpool/bytebufferpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ type VariableWriteBufPool struct {
maxBufferSize int
}

// NewVariableWriteBufPool returns a byte buffer pool that can be used when the
// buffer size is not fixed. It internally maintains pools of different
// NewVariableWriteBufPool returns a pool of bufio.Writers.
// It internally maintains pools of different
// bufio.WriteBuffers in different sizes to accomadate the requested size.
func NewVariableWriteBufPool(maxBufferSize int) *VariableWriteBufPool {
bp := &VariableWriteBufPool{}
Expand Down
2 changes: 2 additions & 0 deletions server/util/compression/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ go_library(
importpath = "github.com/buildbuddy-io/buildbuddy/server/util/compression",
visibility = ["//visibility:public"],
deps = [
"//server/interfaces",
"//server/metrics",
"//server/util/bytebufferpool",
"//server/util/ioutil",
"//server/util/log",
"@com_github_klauspost_compress//zstd",
"@com_github_prometheus_client_golang//prometheus",
Expand Down
24 changes: 22 additions & 2 deletions server/util/compression/compression.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package compression

import (
"bufio"
"errors"
"io"
"runtime"
"sync"

"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/metrics"
"github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool"
"github.com/buildbuddy-io/buildbuddy/server/util/ioutil"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -27,6 +30,11 @@ var (
zstdDecoderPool = NewZstdDecoderPool()

compressBufPool = bytebufferpool.FixedSize(compressChunkSize)
writerPool = sync.Pool{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe call this bufWriterPool

New: func() any {
return bufio.NewWriterSize(io.Discard, compressChunkSize)
},
}
)

func mustGetZstdEncoder() *zstd.Encoder {
Expand Down Expand Up @@ -222,13 +230,25 @@ func (c *compressingWriter) Close() error {
// NewZstdCompressingWriter returns a writer that compresses each chunk of the
// input using zstd and writes the compressed data to the underlying writer.
// The writer uses a fixed-size 4MB buffer for compression.
Copy link
Member

@bduffany bduffany Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My interpretation of this is that if I do several writes of 1 byte each, they will be buffered up to 4MB and then that 4MB chunk will be compressed in a single batch, but the current impl is that each call to Write() would create a separate compression frame for each byte, which would be inefficient.

Copy link
Member

@bduffany bduffany Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we might want to have a ZstdEncoderPool (similar to ZstdDecoderPool), where each encoder has the max concurrency option set to 1, and then pool the encoders, instead of pooling byte buffers. That way, we let the zstd library optimally handle compression frame allocation for us (based on the compression level), instead of introducing our own 4MB compression window on top of the compression window that zstd is already using internally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bduffany I changed the implementation to use a bufio.Writer, so all writes up to be 4MB will in fact be buffered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sort of mentioned this in my follow-up comment (not sure if you saw it) but it seems better to pool the decoders and let the compression library handle buffering, instead of implementing this buffering ourselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, gotcha! Yes, that'd be pretty sweet.

func NewZstdCompressingWriter(w io.Writer) io.WriteCloser {
func NewZstdCompressingWriter(w io.Writer) interfaces.CommittedWriteCloser {
compressBuf := compressBufPool.Get()
return &compressingWriter{
compressor := &compressingWriter{
w: w,
compressBuf: compressBuf,
poolCompressBuf: compressBuf,
}
bw := writerPool.Get().(*bufio.Writer)
bw.Reset(compressor)
cwc := ioutil.NewCustomCommitWriteCloser(bw)
cwc.CommitFn = func(_ int64) error {
return bw.Flush()
}
cwc.CloseFn = func() error {
err := compressor.Close()
writerPool.Put(bw)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing here about not adding to the pool twice if close is called twice. If you add a compressingWriter.closed field, you could just check that.

return err
}
return cwc
}

// NewZstdDecompressingReader reads zstd-compressed data from the input
Expand Down
2 changes: 2 additions & 0 deletions server/util/compression/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func compressWithNewZstdCompressingWriter(t *testing.T, src []byte) []byte {
written, err := cw.Write(src)
require.NoError(t, err)
require.Equal(t, len(src), written)
err = cw.Commit()
require.NoError(t, err)
err = cw.Close()
require.NoError(t, err)
return compressed.Bytes()
Expand Down
Loading