Skip to content

Commit bda4d40

Browse files
committed
Close the old connection to make sure the broker drops the producer on its side
1 parent 742f1b1 commit bda4d40

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

pulsar/internal/connection.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,7 @@ func (c *connection) Close() {
843843
c.Lock()
844844
cnx := c.cnx
845845
// do not use changeState() since they share the same lock
846-
c.setState(connectionClosed)
846+
c.setState(connectionClosing)
847847
c.cond.Broadcast()
848848
c.Unlock()
849849

@@ -853,6 +853,13 @@ func (c *connection) Close() {
853853

854854
close(c.closeCh)
855855

856+
c.Lock()
857+
cnx := c.cnx
858+
// do not use changeState() since they share the same lock
859+
c.setState(connectionClosed)
860+
c.cond.Broadcast()
861+
c.Unlock()
862+
856863
listeners := make(map[uint64]ConnectionListener)
857864
c.listenersLock.Lock()
858865
for id, listener := range c.listeners {

0 commit comments

Comments
 (0)