-
Notifications
You must be signed in to change notification settings - Fork 106
[compression] NewZstdCompressingWriter #9521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
||
func (c *compressingWriter) Close() error { | ||
compressBufPool.Put(c.poolCompressBuf) | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this also close c.w
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess so? I guess NewZstdCompressingWriter
takes a Writer and returns a WriteCloser, and it would be more consistent if it took a WriteCloser as input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nvm, I didn't realize you're accepting a writer. In that case, it's ok for the caller to be responsible for closing the writer, if it needs to be closed.
On the other hand, this makes it harder for the caller if they need to return the writer further up the stream. Maybe you can just add
if closer, ok := c.w.(io.Closer); ok {
return closer.Close()
}
And then document that if the passed in writer is a Closer, it will be closed when the returned writer is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I like it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized multiple small Writes could lead to poor compression, so I decided to wrap this Writer in a bufio.Writer. What do you think?
LGTM. A nice side effect is that you get a decent implementation of ReadFrom
for free. Also, if you get large writes, bufio.Writer skips the buffering, so it won't add unnecessary copies.
@@ -27,6 +30,11 @@ var ( | |||
zstdDecoderPool = NewZstdDecoderPool() | |||
|
|||
compressBufPool = bytebufferpool.FixedSize(compressChunkSize) | |||
writerPool = sync.Pool{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe call this bufWriterPool
|
||
// Close puts the compression buffer back into the pool. | ||
// If the underlying Writer is also a Closer, it closes it. | ||
func (c *compressingWriter) Close() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write and Close should probably both fail if Close was already called. At the very least, if Close was already called and we call Close again, we shouldn't put the compress buffer in the pool again.
} | ||
cwc.CloseFn = func() error { | ||
err := compressor.Close() | ||
writerPool.Put(bw) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thing here about not adding to the pool twice if close is called twice. If you add a compressingWriter.closed
field, you could just check that.
|
||
// NewZstdCompressingWriter returns a writer that compresses each chunk of the | ||
// input using zstd and writes the compressed data to the underlying writer. | ||
// The writer uses a fixed-size 4MB buffer for compression. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My interpretation of this is that if I do several writes of 1 byte each, they will be buffered up to 4MB and then that 4MB chunk will be compressed in a single batch, but the current impl is that each call to Write() would create a separate compression frame for each byte, which would be inefficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe we might want to have a ZstdEncoderPool (similar to ZstdDecoderPool), where each encoder has the max concurrency option set to 1, and then pool the encoders, instead of pooling byte buffers. That way, we let the zstd library optimally handle compression frame allocation for us (based on the compression level), instead of introducing our own 4MB compression window on top of the compression window that zstd is already using internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bduffany I changed the implementation to use a bufio.Writer, so all writes up to be 4MB will in fact be buffered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I sort of mentioned this in my follow-up comment (not sure if you saw it) but it seems better to pool the decoders and let the compression library handle buffering, instead of implementing this buffering ourselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, gotcha! Yes, that'd be pretty sweet.
This reverts commit e0fba15.
Not yet needed! Closing for now. Thanks for the pointers! |
No description provided.