Skip to content

Proposal: Native Support for Server-Sent-Events (SSE) #3696

Open
@raphael

Description

@raphael

Background

Server-Side-Events is a HTTP based server to client streaming protocol that is gaining more popularity thanks to its simplicity, HTTP compatibility and adoption by newer protocols like MCP.

Goa makes it possible to implement an HTTP endpoint which implements Server-Sent-Events with a Goa service today by defining a design that complies with the standard and bypassing the generation of decoder via SkipResponseBodyEncodeDecode:

Method("stream", func() {
	Description("Stream events using Server-Sent Events")
	HTTP(func() {
		GET("/events/stream")
		SkipResponseBodyEncodeDecode()
	})
})

Dummy implementation:

type sseReader struct{}

// Implement Goa generated endpoint method
func (s *Service) Stream(ctx context.Context) (resp io.ReadCloser, err error) {
    resp = &sseReader{}
    return
}

// Go http stdlib implementation will call WriteTo on the ResponseWriter
func (r *sseReader) WriteTo(w io.Writer) (n int64, err error) {
    rw := w.(http.ResponseWriter)
    rw.Header().Set("Content-Type", "text/event-stream")
    rw.Header().Set("Cache-Control", "no-cache")
    w.Write([]byte("data: foo"))
    return 9, nil
}

// Dummy implementation to satisfy io.ReadCloser interface
// WriteTo will get used
func (r *sseReader) Read(_ []byte) (n int64, err error) {
    return 0, nil
}

func (r *sseReader) Close() error {
    return nil
}

While this works it requires intimate knowledge with the SSE protocol, the Go http stdlib package and Goa's SkipResponseBodyEncodeDecode behavior. This proposal adds native support for SSE to Goa, making it trivial to add SSE endpoints in Goa services.

DSL

This proposal introduces a new DSL function ServerSentEvents which tells Goa that a streaming endpoint should be implemented using SSE instead of the default WebSocket implementation:

var Event = Type("Event", func() {
	Description("A notification message sent via SSE")
	Attribute("message", String, "Message body")
	Attribute("timestamp", Int, "Unix timestamp")
	Required("message", "timestamp")
})

Method("stream", func() {
	Description("Stream events using Server-Sent Events")
        StreamingResult(Event) // Specifies a streaming endpint
	HTTP(func() {
		GET("/events/stream")
		ServerSentEvents() // Use SSE instead of WebSocket
		// Will stream events with the shape:
		// data: {"message":<message>,"timestamp":<timestamp>}
	})
})

ServerSentEvents also makes it possible to specify how to populate the SSE event fields explicitly:

ServerSentEvents("message") // Use the message field to populate the "data" field of SSE events
ServerSentEvents(func() {
    SSEEventData("message") // Use result type "message" attribute to populate "data" SSE event field
    SSEEventType("type")    // Use result type "type" attribute to populate "event" SSE event field
    SSEEventID("id")        // Use result type "id" attribute to populate "id" SSE event field
    SSEEventRetry("retry")  // Use result type "retry" attribute to populate "retry" SSE event field
})

The above is equivalent to:

ServerSentEvents("message", func() {
    SSEEventType("type")    // Use result type "type" attribute to populate "event" SSE event field
    SSEEventID("id")        // Use result type "id" attribute to populate "id" SSE event field
    SSEEventRetry("retry")  // Use result type "retry" attribute to populate "retry" SSE event field
})

Finally the SSERequestID function makes it possible to set the "Last-Event-Id" request header:

ServerSentEvents(func() {
    SSERequestID("startID") // Use payload "startID" attribute to populate "Last-Event-Id" request header
})

Generated Code

The generated endpoint method follows the same signature as the WebSocket case:

Stream(context.Context, *StreamPayload, StreamServerStream) (err error)

Where StreamPayload contains any request payload (including the attribute initialized with the value of the "Last-Event-Id" request header if present and defined in the design) and StreamServerStream is the same interface used for WebSockets:

type StreamServerStream interface {
    // Send streams instances of "string".
    Send(<result>) error
    // SendWithContext streams instances of "string" with context.
    SendWithContext(context.Context, <result>) error
    // Close closes the stream.
    Close() error
}

Where <result> is the type given to the StreamingResult DSL function.

The generated Goa service method can be implemented as follows:

func (s *svc) Stream(ctx context.Context, p *svc.StreamPayload, stream svc.StreamServerStream) error {
	for {
		select {
		case <-ctx.Done():
			break
		case ev := <-svc.chan:
			if err := stream.Send(ev); err != nil {
				return err
			}
		}
	}
	return nil
}

The code above assumes that the service is equipped with a chan field which is a channel where events get published by the business logic.
Given the code above Goa:

  • Writes default SSE headers, this can be overridden using a HTTP middleware, the generated code won't overwrite headers that already exist.
  • Marshals the business logic event into a SSE event following the attribute names defined in the design.
    The event data attributes gets marshaled using the configured HTTP encoder if not a primitive type.
  • Streams the result to the response body

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions