Skip to content

Commit ea63565

Browse files
committed
propagate compressorOptions in stream and rpc_util
1 parent 2e191a7 commit ea63565

File tree

2 files changed

+11
-8
lines changed

2 files changed

+11
-8
lines changed

rpc_util.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ func (d *gzipDecompressor) Type() string {
152152
// callInfo contains all related configuration and information about an RPC.
153153
type callInfo struct {
154154
compressorName string
155+
compressorOptions []any
155156
failFast bool
156157
maxReceiveMessageSize *int
157158
maxSendMessageSize *int
@@ -452,7 +453,7 @@ func (o PerRPCCredsCallOption) after(*callInfo, *csAttempt) {}
452453
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
453454
// later release.
454455
func UseCompressor(name string, compressorOptions ...any) CallOption {
455-
return CompressorCallOption{CompressorType: name}
456+
return CompressorCallOption{CompressorType: name, CompressorOptions: compressorOptions}
456457
}
457458

458459
// CompressorCallOption is a CallOption that indicates the compressor to use.
@@ -462,11 +463,13 @@ func UseCompressor(name string, compressorOptions ...any) CallOption {
462463
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
463464
// later release.
464465
type CompressorCallOption struct {
465-
CompressorType string
466+
CompressorType string
467+
CompressorOptions []any
466468
}
467469

468470
func (o CompressorCallOption) before(c *callInfo) error {
469471
c.compressorName = o.CompressorType
472+
c.compressorOptions = o.CompressorOptions
470473
return nil
471474
}
472475
func (o CompressorCallOption) after(*callInfo, *csAttempt) {}
@@ -735,7 +738,7 @@ func encode(c baseCodec, msg any) (mem.BufferSlice, error) {
735738
// indicating no compression was done.
736739
//
737740
// TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor.
738-
func compress(in mem.BufferSlice, cp Compressor, compressor encoding.Compressor, pool mem.BufferPool) (mem.BufferSlice, payloadFormat, error) {
741+
func compress(in mem.BufferSlice, cp Compressor, compressor encoding.Compressor, pool mem.BufferPool, compressorOptions ...any) (mem.BufferSlice, payloadFormat, error) {
739742
if (compressor == nil && cp == nil) || in.Len() == 0 {
740743
return nil, compressionNone, nil
741744
}
@@ -746,7 +749,7 @@ func compress(in mem.BufferSlice, cp Compressor, compressor encoding.Compressor,
746749
return status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
747750
}
748751
if compressor != nil {
749-
z, err := compressor.Compress(w)
752+
z, err := compressor.Compress(w, compressorOptions...)
750753
if err != nil {
751754
return nil, 0, wrapErr(err)
752755
}

stream.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,7 @@ func (cs *clientStream) SendMsg(m any) (err error) {
910910
}
911911

912912
// load hdr, payload, data
913-
hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool)
913+
hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool, cs.callInfo.compressorOptions)
914914
if err != nil {
915915
return err
916916
}
@@ -1417,7 +1417,7 @@ func (as *addrConnStream) SendMsg(m any) (err error) {
14171417
}
14181418

14191419
// load hdr, payload, data
1420-
hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.sendCompressorV0, as.sendCompressorV1, as.ac.dopts.copts.BufferPool)
1420+
hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.sendCompressorV0, as.sendCompressorV1, as.ac.dopts.copts.BufferPool, as.callInfo.compressorOptions)
14211421
if err != nil {
14221422
return err
14231423
}
@@ -1814,7 +1814,7 @@ func MethodFromServerStream(stream ServerStream) (string, bool) {
18141814
// compression was made and therefore whether the payload needs to be freed in
18151815
// addition to the returned data. Freeing the payload if the returned boolean is
18161816
// false can lead to undefined behavior.
1817-
func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
1817+
func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool, compressorOptions ...any) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
18181818
if preparedMsg, ok := m.(*PreparedMsg); ok {
18191819
return preparedMsg.hdr, preparedMsg.encodedData, preparedMsg.payload, preparedMsg.pf, nil
18201820
}
@@ -1824,7 +1824,7 @@ func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor,
18241824
if err != nil {
18251825
return nil, nil, nil, 0, err
18261826
}
1827-
compData, pf, err := compress(data, cp, comp, pool)
1827+
compData, pf, err := compress(data, cp, comp, pool, compressorOptions...)
18281828
if err != nil {
18291829
data.Free()
18301830
return nil, nil, nil, 0, err

0 commit comments

Comments
 (0)