Skip to content

[compression] NewZstdCompressingWriter #9521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed

[compression] NewZstdCompressingWriter #9521

wants to merge 12 commits into from

Conversation

dan-stowell
Copy link
Contributor

No description provided.


func (c *compressingWriter) Close() error {
compressBufPool.Put(c.poolCompressBuf)
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also close c.w?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess so? I guess NewZstdCompressingWriter takes a Writer and returns a WriteCloser, and it would be more consistent if it took a WriteCloser as input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nvm, I didn't realize you're accepting a writer. In that case, it's ok for the caller to be responsible for closing the writer, if it needs to be closed.

On the other hand, this makes it harder for the caller if they need to return the writer further up the stream. Maybe you can just add

if closer, ok := c.w.(io.Closer); ok {
  return closer.Close()
}

And then document that if the passed in writer is a Closer, it will be closed when the returned writer is closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I like it!

@dan-stowell
Copy link
Contributor Author

@vanja-p, I realized multiple small Writes could lead to poor compression, so I decided to wrap this Writer in a bufio.Writer. What do you think?

@bduffany, safe to use 4 * 1024 * 1024 bytes for the underlying compressBuf?

Copy link
Contributor

@vanja-p vanja-p left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized multiple small Writes could lead to poor compression, so I decided to wrap this Writer in a bufio.Writer. What do you think?

LGTM. A nice side effect is that you get a decent implementation of ReadFrom for free. Also, if you get large writes, bufio.Writer skips the buffering, so it won't add unnecessary copies.

@@ -27,6 +30,11 @@ var (
zstdDecoderPool = NewZstdDecoderPool()

compressBufPool = bytebufferpool.FixedSize(compressChunkSize)
writerPool = sync.Pool{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe call this bufWriterPool


// Close puts the compression buffer back into the pool.
// If the underlying Writer is also a Closer, it closes it.
func (c *compressingWriter) Close() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write and Close should probably both fail if Close was already called. At the very least, if Close was already called and we call Close again, we shouldn't put the compress buffer in the pool again.

}
cwc.CloseFn = func() error {
err := compressor.Close()
writerPool.Put(bw)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing here about not adding to the pool twice if close is called twice. If you add a compressingWriter.closed field, you could just check that.


// NewZstdCompressingWriter returns a writer that compresses each chunk of the
// input using zstd and writes the compressed data to the underlying writer.
// The writer uses a fixed-size 4MB buffer for compression.
Copy link
Member

@bduffany bduffany Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My interpretation of this is that if I do several writes of 1 byte each, they will be buffered up to 4MB and then that 4MB chunk will be compressed in a single batch, but the current impl is that each call to Write() would create a separate compression frame for each byte, which would be inefficient.

Copy link
Member

@bduffany bduffany Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we might want to have a ZstdEncoderPool (similar to ZstdDecoderPool), where each encoder has the max concurrency option set to 1, and then pool the encoders, instead of pooling byte buffers. That way, we let the zstd library optimally handle compression frame allocation for us (based on the compression level), instead of introducing our own 4MB compression window on top of the compression window that zstd is already using internally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bduffany I changed the implementation to use a bufio.Writer, so all writes up to be 4MB will in fact be buffered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sort of mentioned this in my follow-up comment (not sure if you saw it) but it seems better to pool the decoders and let the compression library handle buffering, instead of implementing this buffering ourselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, gotcha! Yes, that'd be pretty sweet.

@dan-stowell
Copy link
Contributor Author

Not yet needed! Closing for now. Thanks for the pointers!

@dan-stowell dan-stowell closed this Jun 3, 2025
@dan-stowell dan-stowell deleted the dan-zcw branch June 6, 2025 13:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants