Skip to content

After the local batch processing is completed, the displacement is submitted and the single-thread consumption is performed, and the message re-consumption thread will appear #3138

@ReganWz

Description

@ReganWz

After the local batch processing is completed, the displacement is submitted and the single-thread consumption is performed, and the message re-consumption thread will appear

`
package main

import (
"context"
"github.com/IBM/sarama"
"log"
"sync"
)

type ConsumerGroupHandler struct {
batchList []*sarama.ConsumerMessage
}

func (h *ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}

func (h *ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}

func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

for msg := range claim.Messages() {
	h.batchList = append(h.batchList, msg)
	if len(h.batchList) >= 1000 {
		h.batchProcessKafkaData(session)
	}
}
return nil

}

func (h *ConsumerGroupHandler) batchProcessKafkaData(session sarama.ConsumerGroupSession) {
//业务处理
for _, msg := range h.batchList {
session.MarkMessage(msg, "")
}
}

func main() {
config := sarama.NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = false
config.Version = sarama.V2_5_0_0

brokerList := []string{"localhost:9092"}
topic := "test"

client, err := sarama.NewConsumerGroup(brokerList, "test-group", config)
if err != nil {
	log.Panicf("Error creating consumer group: %v", err)
}
defer client.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
// 启动消费
go func() {
	for {
		if err := client.Consume(ctx, []string{topic}, &ConsumerGroupHandler{
			batchList: make([]*sarama.ConsumerMessage, 0),
		}); err != nil {
			log.Panicf("Error from consumer: %v", err)
		}
	}

}()
wg.Wait()

}

`

Metadata

Metadata

Assignees

No one assigned

    Labels

    staleIssues and pull requests without any recent activity

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions