diff --git a/.changeset/yellow-files-build.md b/.changeset/yellow-files-build.md new file mode 100644 index 0000000000..9c2c3c4844 --- /dev/null +++ b/.changeset/yellow-files-build.md @@ -0,0 +1,5 @@ +--- +'graphql-yoga': minor +--- + +Support GraphQL over SSE distinct connection mode diff --git a/packages/graphql-yoga/__tests__/graphql-sse.spec.ts b/packages/graphql-yoga/__tests__/graphql-sse.spec.ts new file mode 100644 index 0000000000..e7e1268214 --- /dev/null +++ b/packages/graphql-yoga/__tests__/graphql-sse.spec.ts @@ -0,0 +1,188 @@ +import { createSchema, createYoga } from '../src/index.js' +import { createClient } from 'graphql-sse' + +describe('GraphQL over SSE', () => { + const schema = createSchema({ + typeDefs: /* GraphQL */ ` + type Query { + hello: String! + } + type Subscription { + greetings: String! + waitForPings: String! + } + `, + resolvers: { + Query: { + async hello() { + return 'world' + }, + }, + Subscription: { + greetings: { + async *subscribe() { + for (const hi of ['Hi', 'Bonjour', 'Hola', 'Ciao', 'Zdravo']) { + yield { greetings: hi } + } + }, + }, + waitForPings: { + // eslint-disable-next-line require-yield + async *subscribe() { + // a ping is issued every 100ms, wait for a few and just return + await new Promise((resolve) => setTimeout(resolve, 100 * 3 + 50)) + return + }, + }, + }, + }, + }) + + const yoga = createYoga({ + schema, + legacySse: false, + maskedErrors: false, + }) + + describe('Distinct connections mode', () => { + test('should issue pings while connected', async () => { + const res = await yoga.fetch( + 'http://yoga/graphql?query=subscription{waitForPings}', + { + headers: { + accept: 'text/event-stream', + }, + }, + ) + expect(res.ok).toBeTruthy() + await expect(res.text()).resolves.toMatchInlineSnapshot(` + ": + + : + + : + + event: complete + + " + `) + }) + + it('should support single result operations', async () => { + const client = createClient({ + url: 'http://yoga/graphql', + fetchFn: yoga.fetch, + abortControllerImpl: yoga.fetchAPI.AbortController, + singleConnection: false, // distinct connection mode + retryAttempts: 0, + }) + + await expect( + new Promise((resolve, reject) => { + let result: unknown + client.subscribe( + { + query: /* GraphQL */ ` + { + hello + } + `, + }, + { + next: (msg) => (result = msg), + error: reject, + complete: () => resolve(result), + }, + ) + }), + ).resolves.toMatchInlineSnapshot(` + { + "data": { + "hello": "world", + }, + } + `) + + client.dispose() + }) + + it('should support streaming operations', async () => { + const client = createClient({ + url: 'http://yoga/graphql', + fetchFn: yoga.fetch, + abortControllerImpl: yoga.fetchAPI.AbortController, + singleConnection: false, // distinct connection mode + retryAttempts: 0, + }) + + await expect( + new Promise((resolve, reject) => { + const msgs: unknown[] = [] + client.subscribe( + { + query: /* GraphQL */ ` + subscription { + greetings + } + `, + }, + { + next: (msg) => msgs.push(msg), + error: reject, + complete: () => resolve(msgs), + }, + ) + }), + ).resolves.toMatchInlineSnapshot(` + [ + { + "data": { + "greetings": "Hi", + }, + }, + { + "data": { + "greetings": "Bonjour", + }, + }, + { + "data": { + "greetings": "Hola", + }, + }, + { + "data": { + "greetings": "Ciao", + }, + }, + { + "data": { + "greetings": "Zdravo", + }, + }, + ] + `) + + client.dispose() + }) + + it('should report errors through the stream', async () => { + const res = await yoga.fetch('http://yoga/graphql?query={nope}', { + headers: { + accept: 'text/event-stream', + }, + }) + expect(res.ok).toBeTruthy() + await expect(res.text()).resolves.toMatchInlineSnapshot(` + "event: next + data: {"errors":[{"message":"Cannot query field \\"nope\\" on type \\"Query\\".","locations":[{"line":1,"column":2}]}]} + + event: complete + + " + `) + }) + }) + + it.todo('Single connections mode') +}) diff --git a/packages/graphql-yoga/package.json b/packages/graphql-yoga/package.json index f7c863e03e..73d66d689c 100644 --- a/packages/graphql-yoga/package.json +++ b/packages/graphql-yoga/package.json @@ -73,6 +73,7 @@ "graphql": "^16.0.1", "graphql-http": "^1.7.2", "graphql-scalars": "1.20.4", + "graphql-sse": "^2.0.0", "html-minifier-terser": "7.1.0", "json-bigint-patch": "0.0.8", "puppeteer": "19.6.0" diff --git a/packages/graphql-yoga/src/error.ts b/packages/graphql-yoga/src/error.ts index a035176881..9f82097357 100644 --- a/packages/graphql-yoga/src/error.ts +++ b/packages/graphql-yoga/src/error.ts @@ -110,7 +110,7 @@ export function handleError( export function getResponseInitByRespectingErrors( result: ResultProcessorInput, headers: Record = {}, - isApplicationJson = false, + prefer200 = false, ) { let status: number | undefined let unexpectedErrorExists = false @@ -130,7 +130,7 @@ export function getResponseInitByRespectingErrors( if (error.extensions.http.headers) { Object.assign(headers, error.extensions.http.headers) } - if (isApplicationJson && error.extensions.http.spec) { + if (prefer200 && error.extensions.http.spec) { continue } if ( diff --git a/packages/graphql-yoga/src/plugins/resultProcessor/graphql-sse.ts b/packages/graphql-yoga/src/plugins/resultProcessor/graphql-sse.ts new file mode 100644 index 0000000000..487834e779 --- /dev/null +++ b/packages/graphql-yoga/src/plugins/resultProcessor/graphql-sse.ts @@ -0,0 +1,91 @@ +import { ExecutionResult } from 'graphql' +import { isAsyncIterable } from '@envelop/core' + +import { getResponseInitByRespectingErrors } from '../../error.js' +import { FetchAPI, MaybeArray } from '../../types.js' +import { ResultProcessorInput } from '../types.js' +import { jsonStringifyResultWithoutInternals } from './stringify.js' + +export function processGraphQLSSEResult( + result: ResultProcessorInput, + fetchAPI: FetchAPI, +): Response { + // TODO: implement "single connection mode" + + let pingerMs = 12_000 + + // for testing the pings, reduce the timeout significantly + if (globalThis.process?.env?.NODE_ENV === 'test') { + pingerMs = 100 + } + + const headersInit = { + 'Content-Type': 'text/event-stream', + Connection: 'keep-alive', + 'Cache-Control': 'no-cache', + 'Content-Encoding': 'none', + } + + const responseInit = getResponseInitByRespectingErrors( + result, + headersInit, + // as per the GraphQL over SSE spec, operation errors must be reported + // through the stream and the response head should always be 200: OK + true, + ) + + let iterator: AsyncIterator> + let pinger: ReturnType + const textEncoder = new fetchAPI.TextEncoder() + const readableStream = new fetchAPI.ReadableStream({ + start(controller) { + // ping client every 12 seconds to keep the connection alive + pinger = setInterval(() => { + if (controller.desiredSize) { + controller.enqueue(textEncoder.encode(':\n\n')) + } else { + // TODO: why disable pinger when no desired size? + clearInterval(pinger) + } + }, pingerMs) + + if (isAsyncIterable(result)) { + iterator = result[Symbol.asyncIterator]() + } else { + let finished = false + iterator = { + next: () => { + if (finished) { + return Promise.resolve({ done: true, value: null }) + } + finished = true + return Promise.resolve({ done: false, value: result }) + }, + } + } + }, + async pull(controller) { + const { done, value } = await iterator.next() + if (value != null) { + controller.enqueue( + textEncoder.encode( + `event: next\ndata: ${jsonStringifyResultWithoutInternals( + value, + )}\n\n`, + ), + ) + } + if (done) { + clearInterval(pinger) + controller.enqueue(textEncoder.encode('event: complete\n\n')) + controller.close() + } + }, + async cancel(e) { + clearInterval(pinger) + await iterator.return?.(e) + }, + }) + + return new fetchAPI.Response(readableStream, responseInit) +} diff --git a/packages/graphql-yoga/src/plugins/useResultProcessor.ts b/packages/graphql-yoga/src/plugins/useResultProcessor.ts index 66a7b785b0..822e5276c4 100644 --- a/packages/graphql-yoga/src/plugins/useResultProcessor.ts +++ b/packages/graphql-yoga/src/plugins/useResultProcessor.ts @@ -6,6 +6,7 @@ import { } from './resultProcessor/accept.js' import { processMultipartResult } from './resultProcessor/multipart.js' import { processPushResult } from './resultProcessor/push.js' +import { processGraphQLSSEResult } from './resultProcessor/graphql-sse.js' import { processRegularResult } from './resultProcessor/regular.js' import { Plugin, ResultProcessor } from './types.js' @@ -36,7 +37,7 @@ const regular: ResultProcessorConfig = { const defaultList = [textEventStream, multipart, regular] const subscriptionList = [multipart, textEventStream, regular] -export function useResultProcessors(): Plugin { +export function useResultProcessors(opts: { legacySse: boolean }): Plugin { const isSubscriptionRequestMap = new WeakMap() return { onSubscribe({ args: { contextValue } }) { @@ -65,7 +66,10 @@ export function useResultProcessors(): Plugin { acceptableMediaTypes.push(processorMediaType) if (isMatchingMediaType(processorMediaType, requestMediaType)) { setResultProcessor( - resultProcessorConfig.processResult, + !opts.legacySse && + resultProcessorConfig.processResult === processPushResult + ? processGraphQLSSEResult + : resultProcessorConfig.processResult, processorMediaType, ) } diff --git a/packages/graphql-yoga/src/server.ts b/packages/graphql-yoga/src/server.ts index 2d028f056e..309a7d5bee 100644 --- a/packages/graphql-yoga/src/server.ts +++ b/packages/graphql-yoga/src/server.ts @@ -170,6 +170,16 @@ export type YogaServerOptions = { * @default false */ batching?: BatchingOptions + /** + * Use the legacy Yoga SSE and not the [GraphQL over SSE spec](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md) for streaming results. + * + * @note Currently GraphQL Yoga supports exclusively the ["distinct connections mode"](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode). For using the ["distinct connection mode"](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connections-mode) please [use the `@graphql-yoga/plugin-graphql-sse` plugin](https://the-guild.dev/graphql/yoga-server/docs/features/subscriptions#graphql-over-server-sent-events-protocol-via-graphql-sse). + * + * @default true + * + * @deprecated Consider using GraphQL over SSE spec instead by setting this flag to `false`. Starting with the next major release, this flag will default to `false`. + */ + legacySse?: boolean } export type BatchingOptions = @@ -344,7 +354,9 @@ export class YogaServer< parse: parsePOSTFormUrlEncodedRequest, }), // Middlewares after the GraphQL execution - useResultProcessors(), + useResultProcessors({ + legacySse: options?.legacySse == null ? true : options.legacySse, + }), useErrorHandling((error, request) => { const errors = handleError(error, this.maskedErrorsOpts, this.logger) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2ee89b6d7b..3259d75bad 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -6,12 +6,12 @@ overrides: '@changesets/assemble-release-plan': 5.2.1 patchedDependencies: - '@changesets/assemble-release-plan@5.2.1': - hash: gxgixp7rzwjkputc2n2g47utqy - path: patches/@changesets__assemble-release-plan@5.2.1.patch '@graphiql/react@0.13.3': hash: yqm362ey3efuxzwgojxfo6tq5i path: patches/@graphiql__react@0.13.3.patch + '@changesets/assemble-release-plan@5.2.1': + hash: gxgixp7rzwjkputc2n2g47utqy + path: patches/@changesets__assemble-release-plan@5.2.1.patch importers: @@ -1064,6 +1064,7 @@ importers: graphql: 16.6.0 graphql-http: ^1.7.2 graphql-scalars: 1.20.4 + graphql-sse: ^2.0.0 html-minifier-terser: 7.1.0 json-bigint-patch: 0.0.8 lru-cache: ^8.0.0 @@ -1093,6 +1094,7 @@ importers: graphql: 16.6.0 graphql-http: 1.7.2_graphql@16.6.0 graphql-scalars: 1.20.4_graphql@16.6.0 + graphql-sse: 2.0.0_graphql@16.6.0 html-minifier-terser: 7.1.0 json-bigint-patch: 0.0.8 puppeteer: 19.6.0 @@ -18061,6 +18063,15 @@ packages: graphql: '>=0.11 <=16' dev: false + /graphql-sse/2.0.0_graphql@16.6.0: + resolution: {integrity: sha512-TTdFwxGM9RY68s22XWyhc+SyQn3PLbELDD2So0K6Cc6EIlBAyPuNV8VlPfNKa/la7gEf2SwHY7JoJplOmOY4LA==} + engines: {node: '>=12'} + peerDependencies: + graphql: '>=0.11 <=16' + dependencies: + graphql: 16.6.0 + dev: true + /graphql-tag/2.12.6: resolution: {integrity: sha512-FdSNcu2QQcWnM2VNvSCCDCVS5PpPqpzgFT8+GXzqJuoDd0CBncxCY278u4mhRO7tMgo2JjgJA5aZ+nWSQ/Z+xg==} engines: {node: '>=10'} diff --git a/website/src/pages/docs/features/subscriptions.mdx b/website/src/pages/docs/features/subscriptions.mdx index 0b03145e4e..b741fdd876 100644 --- a/website/src/pages/docs/features/subscriptions.mdx +++ b/website/src/pages/docs/features/subscriptions.mdx @@ -371,6 +371,30 @@ const network = Network.create(executeQueryOrMutation, executeSubscription) ## GraphQL over Server-Sent Events Protocol (via `graphql-sse`) +There are two different modes in [GraphQL over Server-Sent Events spec](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md). + +### Distinct Connections Mode + +GraphQL Yoga supports [GraphQL over Server-Sent Events spec](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode) only in "distinct connections mode" with the `legacySse: false` flag. + +```ts filename="yoga.ts" +import { createServer } from 'node:http' +import { createYoga } from 'graphql-yoga' + +const yogaApp = createYoga({ + // ... + legacySse: false +}) +``` + + + When you enable this, simple SSE recipes won't work because it changes the + data events format. You should use refer to [`graphql-sse` client + recipes](https://github.com/enisdenjo/graphql-sse#recipes). + + +### Single Connection Mode + In case you want the subscriptions to be transported following the [GraphQL over Server-Sent Events Protocol](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md), you simply use the `@graphql-yoga/plugin-graphql-sse` plugin for GraphQL Yoga that exposes an additional endpoint (defaulting to `/graphql/stream`) used for [graphql-sse](https://github.com/enisdenjo/graphql-sse) clients. The plugin will hijack the request from the `onRequest` hook and will use **all** envelop plugins provided.