-
Notifications
You must be signed in to change notification settings - Fork 106
[compression] NewZstdCompressingWriter #9521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
51a4066
03f7b60
b472332
99548a6
e8375c0
ebc0c09
09a568c
cf73c1f
11b3efd
e0fba15
a4ff2cb
a789d29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,11 +7,14 @@ import ( | |
"sync" | ||
|
||
"github.com/buildbuddy-io/buildbuddy/server/metrics" | ||
"github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool" | ||
"github.com/buildbuddy-io/buildbuddy/server/util/log" | ||
"github.com/klauspost/compress/zstd" | ||
"github.com/prometheus/client_golang/prometheus" | ||
) | ||
|
||
const compressChunkSize = 4 * 1024 * 1024 // 4MB | ||
|
||
var ( | ||
// zstdEncoder can be shared across goroutines to compress chunks of data | ||
// using EncodeAll. Streaming functions such as encoder.ReadFrom or io.Copy | ||
|
@@ -22,6 +25,8 @@ var ( | |
// either for streaming decompression using ReadFrom or batch decompression | ||
// using DecodeAll. The returned decoders *must not* be closed. | ||
zstdDecoderPool = NewZstdDecoderPool() | ||
|
||
compressBufPool = bytebufferpool.FixedSize(compressChunkSize) | ||
) | ||
|
||
func mustGetZstdEncoder() *zstd.Encoder { | ||
|
@@ -177,6 +182,50 @@ func NewZstdCompressingReader(reader io.ReadCloser, readBuf []byte, compressBuf | |
}, nil | ||
} | ||
|
||
type compressingWriter struct { | ||
w io.Writer | ||
compressBuf []byte | ||
poolCompressBuf []byte | ||
} | ||
|
||
func (c *compressingWriter) Write(p []byte) (int, error) { | ||
var totalWritten int | ||
for len(p) > 0 { | ||
chunkSize := min(len(p), cap(c.compressBuf)) | ||
chunk := p[:chunkSize] | ||
c.compressBuf = CompressZstd(c.compressBuf[:0], chunk) | ||
|
||
written, err := c.w.Write(c.compressBuf) | ||
if err != nil { | ||
return totalWritten, err | ||
} | ||
if written < len(c.compressBuf) { | ||
return totalWritten, io.ErrShortWrite | ||
} | ||
|
||
totalWritten += chunkSize | ||
p = p[chunkSize:] | ||
} | ||
return totalWritten, nil | ||
} | ||
|
||
func (c *compressingWriter) Close() error { | ||
compressBufPool.Put(c.poolCompressBuf) | ||
return nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this also close There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess so? I guess There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh nvm, I didn't realize you're accepting a writer. In that case, it's ok for the caller to be responsible for closing the writer, if it needs to be closed. On the other hand, this makes it harder for the caller if they need to return the writer further up the stream. Maybe you can just add
And then document that if the passed in writer is a Closer, it will be closed when the returned writer is closed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I like it! |
||
} | ||
|
||
// NewZstdCompressingWriter returns a writer that compresses each chunk of the | ||
// input using zstd and writes the compressed data to the underlying writer. | ||
// The writer uses a fixed-size 4MB buffer for compression. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My interpretation of this is that if I do several writes of 1 byte each, they will be buffered up to 4MB and then that 4MB chunk will be compressed in a single batch, but the current impl is that each call to Write() would create a separate compression frame for each byte, which would be inefficient. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think maybe we might want to have a ZstdEncoderPool (similar to ZstdDecoderPool), where each encoder has the max concurrency option set to 1, and then pool the encoders, instead of pooling byte buffers. That way, we let the zstd library optimally handle compression frame allocation for us (based on the compression level), instead of introducing our own 4MB compression window on top of the compression window that zstd is already using internally. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bduffany I changed the implementation to use a bufio.Writer, so all writes up to be 4MB will in fact be buffered. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I sort of mentioned this in my follow-up comment (not sure if you saw it) but it seems better to pool the decoders and let the compression library handle buffering, instead of implementing this buffering ourselves? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack, gotcha! Yes, that'd be pretty sweet. |
||
func NewZstdCompressingWriter(w io.Writer) io.WriteCloser { | ||
compressBuf := compressBufPool.Get() | ||
return &compressingWriter{ | ||
w: w, | ||
compressBuf: compressBuf, | ||
poolCompressBuf: compressBuf, | ||
} | ||
} | ||
|
||
// NewZstdDecompressingReader reads zstd-compressed data from the input | ||
// reader and makes the decompressed data available on the output reader. The | ||
// output reader is also an io.WriterTo, which can often prevent allocations | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write and Close should probably both fail if Close was already called. At the very least, if Close was already called and we call Close again, we shouldn't put the compress buffer in the pool again.