Skip to content

Commit 036c589

Browse files
authored
feat: SSE introduce sync pool for rawEvent to optimize memory usage (#1011)
* sse: introduce a sync pool for `rawEvent` to optimize memory usage. * fix deferred resource cleanup in event parsing Move the `defer putRawEvent(ed)` call after error handling to ensure resources are properly released only when parsing succeeds. This prevents potential resource leaks when errors occur during event parsing.
1 parent ed875fb commit 036c589

File tree

1 file changed

+54
-30
lines changed

1 file changed

+54
-30
lines changed

sse.go

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -534,45 +534,54 @@ func (es *EventSource) listenStream(res *http.Response) error {
534534
return nil
535535
}
536536

537-
e, err := readEvent(scanner)
538-
if err != nil {
539-
if err == io.EOF {
540-
return err
541-
}
542-
es.triggerOnError(err)
537+
if err := es.processEvent(scanner); err != nil {
543538
return err
544539
}
540+
}
541+
}
545542

546-
ed, err := parseEvent(e)
547-
if err != nil {
548-
es.triggerOnError(err)
549-
continue // parsing errors, just continue
543+
func (es *EventSource) processEvent(scanner *bufio.Scanner) error {
544+
e, err := readEvent(scanner)
545+
if err != nil {
546+
if err == io.EOF {
547+
return err
550548
}
549+
es.triggerOnError(err)
550+
return err
551+
}
552+
553+
ed, err := parseEvent(e)
554+
if err != nil {
555+
es.triggerOnError(err)
556+
return nil // parsing errors, will not return error.
557+
}
558+
defer putRawEvent(ed)
559+
560+
if len(ed.ID) > 0 {
561+
es.lock.Lock()
562+
es.lastEventID = string(ed.ID)
563+
es.lock.Unlock()
564+
}
551565

552-
if len(ed.ID) > 0 {
566+
if len(ed.Retry) > 0 {
567+
if retry, err := strconv.Atoi(string(ed.Retry)); err == nil {
553568
es.lock.Lock()
554-
es.lastEventID = string(ed.ID)
569+
es.serverSentRetry = time.Millisecond * time.Duration(retry)
555570
es.lock.Unlock()
571+
} else {
572+
es.triggerOnError(err)
556573
}
574+
}
557575

558-
if len(ed.Retry) > 0 {
559-
if retry, err := strconv.Atoi(string(ed.Retry)); err == nil {
560-
es.lock.Lock()
561-
es.serverSentRetry = time.Millisecond * time.Duration(retry)
562-
es.lock.Unlock()
563-
} else {
564-
es.triggerOnError(err)
565-
}
566-
}
567-
568-
if len(ed.Data) > 0 {
569-
es.handleCallback(&Event{
570-
ID: string(ed.ID),
571-
Name: string(ed.Event),
572-
Data: string(ed.Data),
573-
})
574-
}
576+
if len(ed.Data) > 0 {
577+
es.handleCallback(&Event{
578+
ID: string(ed.ID),
579+
Name: string(ed.Event),
580+
Data: string(ed.Data),
581+
})
575582
}
583+
584+
return nil
576585
}
577586

578587
func (es *EventSource) handleCallback(e *Event) {
@@ -633,7 +642,7 @@ func parseEventFunc(msg []byte) (*rawEvent, error) {
633642
return nil, errors.New("resty:sse: event message was empty")
634643
}
635644

636-
e := new(rawEvent)
645+
e := newRawEvent()
637646

638647
// Split the line by "\n"
639648
for _, line := range bytes.FieldsFunc(msg, func(r rune) bool { return r == '\n' }) {
@@ -670,3 +679,18 @@ func trimHeader(size int, data []byte) []byte {
670679
data = bytes.TrimSuffix(data, []byte("\n"))
671680
return data
672681
}
682+
683+
var rawEventPool = &sync.Pool{New: func() any { return new(rawEvent) }}
684+
685+
func newRawEvent() *rawEvent {
686+
e := rawEventPool.Get().(*rawEvent)
687+
e.ID = e.ID[:0]
688+
e.Data = e.Data[:0]
689+
e.Event = e.Event[:0]
690+
e.Retry = e.Retry[:0]
691+
return e
692+
}
693+
694+
func putRawEvent(e *rawEvent) {
695+
rawEventPool.Put(e)
696+
}

0 commit comments

Comments
 (0)