Skip to content

Commit ee37c7f

Browse files
authored
Refactor Consumer Group to use Client (segmentio#947)
* refactor ConsumerGroup to use Client * add test for readers sharing the default transport
1 parent d4b89e7 commit ee37c7f

11 files changed

+565
-704
lines changed

conn.go

+18-49
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,15 @@ const (
133133
ReadCommitted IsolationLevel = 1
134134
)
135135

136-
var (
137-
// DefaultClientID is the default value used as ClientID of kafka
138-
// connections.
139-
DefaultClientID string
140-
)
136+
// DefaultClientID is the default value used as ClientID of kafka
137+
// connections.
138+
var DefaultClientID string
141139

142140
func init() {
143141
progname := filepath.Base(os.Args[0])
144142
hostname, _ := os.Hostname()
145143
DefaultClientID = fmt.Sprintf("%s@%s (github.com/segmentio/kafka-go)", progname, hostname)
144+
DefaultTransport.(*Transport).ClientID = DefaultClientID
146145
}
147146

148147
// NewConn returns a new kafka connection for the given topic and partition.
@@ -263,10 +262,12 @@ func (c *Conn) Controller() (broker Broker, err error) {
263262
}
264263
for _, brokerMeta := range res.Brokers {
265264
if brokerMeta.NodeID == res.ControllerID {
266-
broker = Broker{ID: int(brokerMeta.NodeID),
265+
broker = Broker{
266+
ID: int(brokerMeta.NodeID),
267267
Port: int(brokerMeta.Port),
268268
Host: brokerMeta.Host,
269-
Rack: brokerMeta.Rack}
269+
Rack: brokerMeta.Rack,
270+
}
270271
break
271272
}
272273
}
@@ -322,7 +323,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
322323
err := c.readOperation(
323324
func(deadline time.Time, id int32) error {
324325
return c.writeRequest(findCoordinator, v0, id, request)
325-
326326
},
327327
func(deadline time.Time, size int) error {
328328
return expectZeroSize(func() (remain int, err error) {
@@ -340,32 +340,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
340340
return response, nil
341341
}
342342

343-
// heartbeat sends a heartbeat message required by consumer groups
344-
//
345-
// See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
346-
func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error) {
347-
var response heartbeatResponseV0
348-
349-
err := c.writeOperation(
350-
func(deadline time.Time, id int32) error {
351-
return c.writeRequest(heartbeat, v0, id, request)
352-
},
353-
func(deadline time.Time, size int) error {
354-
return expectZeroSize(func() (remain int, err error) {
355-
return (&response).readFrom(&c.rbuf, size)
356-
}())
357-
},
358-
)
359-
if err != nil {
360-
return heartbeatResponseV0{}, err
361-
}
362-
if response.ErrorCode != 0 {
363-
return heartbeatResponseV0{}, Error(response.ErrorCode)
364-
}
365-
366-
return response, nil
367-
}
368-
369343
// joinGroup attempts to join a consumer group
370344
//
371345
// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
@@ -752,9 +726,8 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
752726
// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
753727
// with the default values in ReadBatchConfig except for minBytes and maxBytes.
754728
func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
755-
756729
var adjustedDeadline time.Time
757-
var maxFetch = int(c.fetchMaxBytes)
730+
maxFetch := int(c.fetchMaxBytes)
758731

759732
if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
760733
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
@@ -960,7 +933,6 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
960933
// connection. If there are none, the method fetches all partitions of the kafka
961934
// cluster.
962935
func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) {
963-
964936
if len(topics) == 0 {
965937
if len(c.topic) != 0 {
966938
defaultTopics := [...]string{c.topic}
@@ -1107,11 +1079,10 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11071079
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
11081080
switch produceVersion {
11091081
case v7:
1110-
recordBatch, err :=
1111-
newRecordBatch(
1112-
codec,
1113-
msgs...,
1114-
)
1082+
recordBatch, err := newRecordBatch(
1083+
codec,
1084+
msgs...,
1085+
)
11151086
if err != nil {
11161087
return err
11171088
}
@@ -1126,11 +1097,10 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11261097
recordBatch,
11271098
)
11281099
case v3:
1129-
recordBatch, err :=
1130-
newRecordBatch(
1131-
codec,
1132-
msgs...,
1133-
)
1100+
recordBatch, err := newRecordBatch(
1101+
codec,
1102+
msgs...,
1103+
)
11341104
if err != nil {
11351105
return err
11361106
}
@@ -1195,7 +1165,6 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11951165
}
11961166
return size, err
11971167
}
1198-
11991168
})
12001169
if err != nil {
12011170
return size, err
@@ -1555,7 +1524,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
15551524
return nil, err
15561525
}
15571526
if version == v1 {
1558-
var request = saslAuthenticateRequestV0{Data: data}
1527+
request := saslAuthenticateRequestV0{Data: data}
15591528
var response saslAuthenticateResponseV0
15601529

15611530
err := c.writeOperation(

0 commit comments

Comments
 (0)