Skip to content

Commit aa25d02

Browse files
committed
ConsumerGroup.Close can infinitely lock
Signed-off-by: Magomed Abdurakhmanov <[email protected]>
1 parent 01b853a commit aa25d02

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

consumer_group.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,12 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
213213
return err
214214
}
215215

216-
// Wait for session exit signal
217-
<-sess.ctx.Done()
216+
// Wait for session exit signal or Close() call
217+
select {
218+
case <-c.closed:
219+
case <-sess.ctx.Done():
220+
default:
221+
}
218222

219223
// Gracefully release session claims
220224
return sess.release(true)

0 commit comments

Comments
 (0)