Skip to content

GraphQL over SSE distinct connection mode #2463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/yellow-files-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'graphql-yoga': minor
---

Support GraphQL over SSE distinct connection mode
188 changes: 188 additions & 0 deletions packages/graphql-yoga/__tests__/graphql-sse.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import { createSchema, createYoga } from '../src/index.js'
import { createClient } from 'graphql-sse'

describe('GraphQL over SSE', () => {
const schema = createSchema({
typeDefs: /* GraphQL */ `
type Query {
hello: String!
}
type Subscription {
greetings: String!
waitForPings: String!
}
`,
resolvers: {
Query: {
async hello() {
return 'world'
},
},
Subscription: {
greetings: {
async *subscribe() {
for (const hi of ['Hi', 'Bonjour', 'Hola', 'Ciao', 'Zdravo']) {
yield { greetings: hi }
}
},
},
waitForPings: {
// eslint-disable-next-line require-yield
async *subscribe() {
// a ping is issued every 100ms, wait for a few and just return
await new Promise((resolve) => setTimeout(resolve, 100 * 3 + 50))
return
},
},
},
},
})

const yoga = createYoga({
schema,
legacySse: false,
maskedErrors: false,
})

describe('Distinct connections mode', () => {
test('should issue pings while connected', async () => {
const res = await yoga.fetch(
'http://yoga/graphql?query=subscription{waitForPings}',
{
headers: {
accept: 'text/event-stream',
},
},
)
expect(res.ok).toBeTruthy()
await expect(res.text()).resolves.toMatchInlineSnapshot(`
":

:

:

event: complete

"
`)
})

it('should support single result operations', async () => {
const client = createClient({
url: 'http://yoga/graphql',
fetchFn: yoga.fetch,
abortControllerImpl: yoga.fetchAPI.AbortController,
singleConnection: false, // distinct connection mode
retryAttempts: 0,
})

await expect(
new Promise((resolve, reject) => {
let result: unknown
client.subscribe(
{
query: /* GraphQL */ `
{
hello
}
`,
},
{
next: (msg) => (result = msg),
error: reject,
complete: () => resolve(result),
},
)
}),
).resolves.toMatchInlineSnapshot(`
{
"data": {
"hello": "world",
},
}
`)

client.dispose()
})

it('should support streaming operations', async () => {
const client = createClient({
url: 'http://yoga/graphql',
fetchFn: yoga.fetch,
abortControllerImpl: yoga.fetchAPI.AbortController,
singleConnection: false, // distinct connection mode
retryAttempts: 0,
})

await expect(
new Promise((resolve, reject) => {
const msgs: unknown[] = []
client.subscribe(
{
query: /* GraphQL */ `
subscription {
greetings
}
`,
},
{
next: (msg) => msgs.push(msg),
error: reject,
complete: () => resolve(msgs),
},
)
}),
).resolves.toMatchInlineSnapshot(`
[
{
"data": {
"greetings": "Hi",
},
},
{
"data": {
"greetings": "Bonjour",
},
},
{
"data": {
"greetings": "Hola",
},
},
{
"data": {
"greetings": "Ciao",
},
},
{
"data": {
"greetings": "Zdravo",
},
},
]
`)

client.dispose()
})

it('should report errors through the stream', async () => {
const res = await yoga.fetch('http://yoga/graphql?query={nope}', {
headers: {
accept: 'text/event-stream',
},
})
expect(res.ok).toBeTruthy()
await expect(res.text()).resolves.toMatchInlineSnapshot(`
"event: next
data: {"errors":[{"message":"Cannot query field \\"nope\\" on type \\"Query\\".","locations":[{"line":1,"column":2}]}]}

event: complete

"
`)
})
})

it.todo('Single connections mode')
})
1 change: 1 addition & 0 deletions packages/graphql-yoga/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"graphql": "^16.0.1",
"graphql-http": "^1.7.2",
"graphql-scalars": "1.20.4",
"graphql-sse": "^2.0.0",
"html-minifier-terser": "7.1.0",
"json-bigint-patch": "0.0.8",
"puppeteer": "19.6.0"
Expand Down
4 changes: 2 additions & 2 deletions packages/graphql-yoga/src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export function handleError(
export function getResponseInitByRespectingErrors(
result: ResultProcessorInput,
headers: Record<string, string> = {},
isApplicationJson = false,
prefer200 = false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a comment explaining this flag

) {
let status: number | undefined
let unexpectedErrorExists = false
Expand All @@ -130,7 +130,7 @@ export function getResponseInitByRespectingErrors(
if (error.extensions.http.headers) {
Object.assign(headers, error.extensions.http.headers)
}
if (isApplicationJson && error.extensions.http.spec) {
if (prefer200 && error.extensions.http.spec) {
continue
}
if (
Expand Down
91 changes: 91 additions & 0 deletions packages/graphql-yoga/src/plugins/resultProcessor/graphql-sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { ExecutionResult } from 'graphql'
import { isAsyncIterable } from '@envelop/core'

import { getResponseInitByRespectingErrors } from '../../error.js'
import { FetchAPI, MaybeArray } from '../../types.js'
import { ResultProcessorInput } from '../types.js'
import { jsonStringifyResultWithoutInternals } from './stringify.js'

export function processGraphQLSSEResult(
result: ResultProcessorInput,
fetchAPI: FetchAPI,
): Response {
// TODO: implement "single connection mode"

let pingerMs = 12_000

// for testing the pings, reduce the timeout significantly
if (globalThis.process?.env?.NODE_ENV === 'test') {
pingerMs = 100
}

const headersInit = {
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
'Cache-Control': 'no-cache',
'Content-Encoding': 'none',
}

const responseInit = getResponseInitByRespectingErrors(
result,
headersInit,
// as per the GraphQL over SSE spec, operation errors must be reported
// through the stream and the response head should always be 200: OK
true,
)

let iterator: AsyncIterator<MaybeArray<ExecutionResult>>
let pinger: ReturnType<typeof setInterval>
const textEncoder = new fetchAPI.TextEncoder()
const readableStream = new fetchAPI.ReadableStream({
start(controller) {
// ping client every 12 seconds to keep the connection alive
pinger = setInterval(() => {
if (controller.desiredSize) {
controller.enqueue(textEncoder.encode(':\n\n'))
} else {
// TODO: why disable pinger when no desired size?
clearInterval(pinger)
}
}, pingerMs)

if (isAsyncIterable(result)) {
iterator = result[Symbol.asyncIterator]()
} else {
let finished = false
iterator = {
next: () => {
if (finished) {
return Promise.resolve({ done: true, value: null })
}
finished = true
return Promise.resolve({ done: false, value: result })
},
}
}
},
async pull(controller) {
const { done, value } = await iterator.next()
if (value != null) {
controller.enqueue(
textEncoder.encode(
`event: next\ndata: ${jsonStringifyResultWithoutInternals(
value,
)}\n\n`,
),
)
}
if (done) {
clearInterval(pinger)
controller.enqueue(textEncoder.encode('event: complete\n\n'))
controller.close()
}
},
async cancel(e) {
clearInterval(pinger)
await iterator.return?.(e)
},
})

return new fetchAPI.Response(readableStream, responseInit)
}
8 changes: 6 additions & 2 deletions packages/graphql-yoga/src/plugins/useResultProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
} from './resultProcessor/accept.js'
import { processMultipartResult } from './resultProcessor/multipart.js'
import { processPushResult } from './resultProcessor/push.js'
import { processGraphQLSSEResult } from './resultProcessor/graphql-sse.js'
import { processRegularResult } from './resultProcessor/regular.js'
import { Plugin, ResultProcessor } from './types.js'

Expand Down Expand Up @@ -36,7 +37,7 @@ const regular: ResultProcessorConfig = {
const defaultList = [textEventStream, multipart, regular]
const subscriptionList = [multipart, textEventStream, regular]

export function useResultProcessors(): Plugin {
export function useResultProcessors(opts: { legacySse: boolean }): Plugin {
const isSubscriptionRequestMap = new WeakMap<Request, boolean>()
return {
onSubscribe({ args: { contextValue } }) {
Expand Down Expand Up @@ -65,7 +66,10 @@ export function useResultProcessors(): Plugin {
acceptableMediaTypes.push(processorMediaType)
if (isMatchingMediaType(processorMediaType, requestMediaType)) {
setResultProcessor(
resultProcessorConfig.processResult,
!opts.legacySse &&
resultProcessorConfig.processResult === processPushResult
? processGraphQLSSEResult
: resultProcessorConfig.processResult,
processorMediaType,
)
}
Expand Down
14 changes: 13 additions & 1 deletion packages/graphql-yoga/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ export type YogaServerOptions<TServerContext, TUserContext> = {
* @default false
*/
batching?: BatchingOptions
/**
* Use the legacy Yoga SSE and not the [GraphQL over SSE spec](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md) for streaming results.
*
* @note Currently GraphQL Yoga supports exclusively the ["distinct connections mode"](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode). For using the ["distinct connection mode"](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connections-mode) please [use the `@graphql-yoga/plugin-graphql-sse` plugin](https://the-guild.dev/graphql/yoga-server/docs/features/subscriptions#graphql-over-server-sent-events-protocol-via-graphql-sse).
*
* @default true
*
* @deprecated Consider using GraphQL over SSE spec instead by setting this flag to `false`. Starting with the next major release, this flag will default to `false`.
*/
legacySse?: boolean
}

export type BatchingOptions =
Expand Down Expand Up @@ -344,7 +354,9 @@ export class YogaServer<
parse: parsePOSTFormUrlEncodedRequest,
}),
// Middlewares after the GraphQL execution
useResultProcessors(),
useResultProcessors({
legacySse: options?.legacySse == null ? true : options.legacySse,
}),
useErrorHandling((error, request) => {
const errors = handleError(error, this.maskedErrorsOpts, this.logger)

Expand Down
Loading