diff --git a/server/util/bytebufferpool/bytebufferpool.go b/server/util/bytebufferpool/bytebufferpool.go index ca378b33c18..5e079cfa2c4 100644 --- a/server/util/bytebufferpool/bytebufferpool.go +++ b/server/util/bytebufferpool/bytebufferpool.go @@ -112,8 +112,8 @@ type VariableWriteBufPool struct { maxBufferSize int } -// NewVariableWriteBufPool returns a byte buffer pool that can be used when the -// buffer size is not fixed. It internally maintains pools of different +// NewVariableWriteBufPool returns a pool of bufio.Writers. +// It internally maintains pools of different // bufio.WriteBuffers in different sizes to accomadate the requested size. func NewVariableWriteBufPool(maxBufferSize int) *VariableWriteBufPool { bp := &VariableWriteBufPool{} diff --git a/server/util/compression/BUILD b/server/util/compression/BUILD index 79742e4a6de..cfc15fc19cf 100644 --- a/server/util/compression/BUILD +++ b/server/util/compression/BUILD @@ -6,7 +6,10 @@ go_library( importpath = "github.com/buildbuddy-io/buildbuddy/server/util/compression", visibility = ["//visibility:public"], deps = [ + "//server/interfaces", "//server/metrics", + "//server/util/bytebufferpool", + "//server/util/ioutil", "//server/util/log", "@com_github_klauspost_compress//zstd", "@com_github_prometheus_client_golang//prometheus", diff --git a/server/util/compression/compression.go b/server/util/compression/compression.go index 0003bbe1bb9..5cec070f9ca 100644 --- a/server/util/compression/compression.go +++ b/server/util/compression/compression.go @@ -1,17 +1,25 @@ package compression import ( + "bufio" "errors" "io" "runtime" "sync" + "github.com/buildbuddy-io/buildbuddy/bazel-buildbuddy/server/util/status" + "github.com/buildbuddy-io/buildbuddy/server/interfaces" "github.com/buildbuddy-io/buildbuddy/server/metrics" + "github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool" + "github.com/buildbuddy-io/buildbuddy/server/util/ioutil" "github.com/buildbuddy-io/buildbuddy/server/util/log" + "github.com/buildbuddy-io/buildbuddy/server/util/status" "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" ) +const compressChunkSize = 4 * 1024 * 1024 // 4MB + var ( // zstdEncoder can be shared across goroutines to compress chunks of data // using EncodeAll. Streaming functions such as encoder.ReadFrom or io.Copy @@ -22,6 +30,13 @@ var ( // either for streaming decompression using ReadFrom or batch decompression // using DecodeAll. The returned decoders *must not* be closed. zstdDecoderPool = NewZstdDecoderPool() + + compressBufPool = bytebufferpool.FixedSize(compressChunkSize) + bufWriterPool = sync.Pool{ + New: func() any { + return bufio.NewWriterSize(io.Discard, compressChunkSize) + }, + } ) func mustGetZstdEncoder() *zstd.Encoder { @@ -177,6 +192,78 @@ func NewZstdCompressingReader(reader io.ReadCloser, readBuf []byte, compressBuf }, nil } +type compressingWriter struct { + w io.Writer + compressBuf []byte + poolCompressBuf []byte + closed bool +} + +func (c *compressingWriter) Write(p []byte) (int, error) { + if c.closed { + return 0, status.FailedPreconditionError("compressingWriter already closed, cannot receive writes") + } + totalWritten := 0 + for len(p) > 0 { + chunkSize := min(len(p), cap(c.compressBuf)) + chunk := p[:chunkSize] + c.compressBuf = CompressZstd(c.compressBuf[:0], chunk) + + written, err := c.w.Write(c.compressBuf) + if err != nil { + return totalWritten, err + } + if written < len(c.compressBuf) { + return totalWritten, io.ErrShortWrite + } + + totalWritten += chunkSize + p = p[chunkSize:] + } + return totalWritten, nil +} + +// Close puts the compression buffer back into the pool. +// If the underlying Writer is also a Closer, it closes it. +func (c *compressingWriter) Close() error { + if c.closed { + return status.FailedPreconditionError("compressingWriter already closed, cannot close again") + } + c.closed = true + compressBufPool.Put(c.poolCompressBuf) + if closer, ok := c.w.(io.Closer); ok { + return closer.Close() + } + return nil +} + +// NewZstdCompressingWriter returns a writer that compresses each chunk of the +// input using zstd and writes the compressed data to the underlying writer. +// The writer uses a fixed-size 4MB buffer for compression. +func NewZstdCompressingWriter(w io.Writer) interfaces.CommittedWriteCloser { + compressBuf := compressBufPool.Get() + compressor := &compressingWriter{ + w: w, + compressBuf: compressBuf, + poolCompressBuf: compressBuf, + } + bw := bufWriterPool.Get().(*bufio.Writer) + bw.Reset(compressor) + cwc := ioutil.NewCustomCommitWriteCloser(bw) + cwc.CommitFn = func(_ int64) error { + return bw.Flush() + } + cwc.CloseFn = func() error { + if compressor.closed { + return status.FailedPreconditionError("Writer already closed, cannot close again") + } + err := compressor.Close() + bufWriterPool.Put(bw) + return err + } + return cwc +} + // NewZstdDecompressingReader reads zstd-compressed data from the input // reader and makes the decompressed data available on the output reader. The // output reader is also an io.WriterTo, which can often prevent allocations diff --git a/server/util/compression/compression_test.go b/server/util/compression/compression_test.go index 1c68219484c..de7e126fe49 100644 --- a/server/util/compression/compression_test.go +++ b/server/util/compression/compression_test.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "io" - "math" "strconv" "testing" @@ -50,18 +49,39 @@ func TestLossless(t *testing.T) { compress: compressWithNewZstdCompressingReader, decompress: decompressWithNewZstdDecompressingReader, }, + { + name: "NewZstdCompressingWriter -> DecompressZstd", + compress: compressWithNewZstdCompressingWriter, + decompress: decompressWithDecompressZstd, + }, + { + name: "NewZstdCompressingWriter -> NewZstdDecompressor", + compress: compressWithNewZstdCompressingWriter, + decompress: decompressWithNewZstdDecompressor, + }, + { + name: "NewZstdCompressingWriter -> NewZstdDecompressingReader", + compress: compressWithNewZstdCompressingWriter, + decompress: decompressWithNewZstdDecompressingReader, + }, } { - for i := 1; i <= 5; i++ { - srclen := int(math.Pow10(i)) + for _, srclen := range []int{9, 99, 999, 1_999_999, 5_999_999} { name := tc.name + "_" + strconv.Itoa(srclen) + "_bytes" t.Run(name, func(t *testing.T) { _, r := testdigest.NewReader(t, int64(srclen)) src, err := io.ReadAll(r) require.NoError(t, err) + require.Len(t, src, srclen) require.Equal(t, srclen, len(src)) compressed := tc.compress(t, src) decompressed := tc.decompress(t, len(src), compressed) + require.Len(t, decompressed, srclen) + if srclen > 1000 { + require.Empty(t, cmp.Diff(src[:1000], decompressed[:1000])) + require.Empty(t, cmp.Diff(src[len(src)-1000:], decompressed[len(decompressed)-1000:])) + return + } require.Empty(t, cmp.Diff(src, decompressed)) }) } @@ -88,6 +108,19 @@ func compressWithNewZstdCompressingReader(t *testing.T, src []byte) []byte { return compressed } +func compressWithNewZstdCompressingWriter(t *testing.T, src []byte) []byte { + compressed := &bytes.Buffer{} + cw := compression.NewZstdCompressingWriter(compressed) + written, err := cw.Write(src) + require.NoError(t, err) + require.Equal(t, len(src), written) + err = cw.Commit() + require.NoError(t, err) + err = cw.Close() + require.NoError(t, err) + return compressed.Bytes() +} + func decompressWithDecompressZstd(t *testing.T, srclen int, compressed []byte) []byte { decompressed := make([]byte, srclen) decompressed, err := compression.DecompressZstd(decompressed, compressed) @@ -111,10 +144,9 @@ func decompressWithNewZstdDecompressingReader(t *testing.T, srclen int, compress rc := io.NopCloser(bytes.NewReader(compressed)) d, err := compression.NewZstdDecompressingReader(rc) require.NoError(t, err) - buf := make([]byte, srclen) - n, err := d.Read(buf) + buf, err := io.ReadAll(d) require.NoError(t, err) - require.Equal(t, srclen, n) + require.Len(t, buf, srclen) err = d.Close() require.NoError(t, err) err = rc.Close()