Skip to content

feat: allow runtime errors and validation errors in onSubscribe #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 25 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
# Graphql Lambda Subscriptions
[![Release](https://github.com/reconbot/graphql-lambda-subscriptions/actions/workflows/test.yml/badge.svg)](https://github.com/reconbot/graphql-lambda-subscriptions/actions/workflows/test.yml)

This is a fork of [subscriptionless](https://github.com/andyrichardson/subscriptionless) that is built to work with [Architect](https://arc.codes) and tested with the [Architect Sandbox](https://arc.codes/docs/en/reference/cli/sandbox). There's no reason why it wont work with Serverless or other deploy tools but their support is not a goal.
This is a fork of [`subscriptionless`](https://github.com/andyrichardson/subscriptionless) and is a Amazon Lambda Serverless equivalent to [graphQL-ws](https://github.com/enisdenjo/graphql-ws). It follows the [`graphql-ws prototcol`](https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md). It is tested with the [Architect Sandbox](https://arc.codes/docs/en/reference/cli/sandbox) against `graphql-ws` directly and run in production today. For many applications `graphql-lambda-subscriptions` should do what `graphql-ws` does for you today without having to run a server.

As `subscriptionless`'s tagline goes;

> Have all the functionality of GraphQL subscriptions on a stateful server without the cost.

## Why a fork?

I had different requirements and needed more features. This project wouldn't exist without `subscriptionless` and you should totally check it out.

## Features

- Only needs DynamoDB, API Gateway and Lambda (no app sync or other platform required, can use step functions for ping/pong support)
- Provides a Pub/Sub system to broadcast events to subscriptions
- Provides hooks for the full lifecycle of a subscription
- Type compatible with GraphQL and [`nexus.js`](https://nexusjs.org)

## Quick Start

Since there are many ways to deploy to amazon lambda I'm going to have to get opinionated in the quickstart and pick [Architect](https://arc.codes). `graphql-lambda-subscriptions` should work on Lambda regardless of your deployment and packaging framework. Take a look at the [arc-basic-events](mocks/arc-basic-events) mock used for integration testing for an example of using it with Architect.

More to come...

## API

This should be generated...

### `subscribe(topic: string, options?: SubscribeOptions): SubscribePseudoIterable`

Subscribe is the most important method in the library. It's the primary difference between `graphql-ws` and `graphql-lambda-subscriptions`. It returns a `SubscribePseudoIterable` that pretends to be an async iterator that you put on the `subscribe` resolver for your Subscription. In reality it includes a few properties that we use to subscribe to events and fire lifecycle functions.
Expand All @@ -20,40 +43,12 @@ interface SubscribeOptions {

- `topic`: The you subscribe to the topic and can filter based upon the topics payload.
- `filter`: An object that the payload will be matched against (or a function that produces the object). If the payload's field matches the subscription will receive the event. If the payload is missing the field the subscription will receive the event.
- `onSubscribe`: A function that gets the subscription information (like arguments) it can throw if you don't want the subscription to subscribe.
- `onSubscribe`: A function that gets the subscription information (like arguments) it can return an array of errors if you don't want the subscription to subscribe.
- `onAfterSubscribe`: A function that gets the subscription information (like arguments) and can fire initial events or record information.
- `onComplete`: A function that fires at least once when a connection disconnects, a client sends a "complete" message, or the server sends a "complete" message. Because of the nature of aws lambda, it's possible for a client to send a "complete" message and disconnect and those events executing on lambda out of order. Which why this function can be called up to twice.

## Old Readme

## About

GraphQL subscriptions for AWS Lambda and API Gateway WebSockets.

Have all the functionality of GraphQL subscriptions on a stateful server without the cost.

> Note: This project uses the [graphql-ws protocol](https://github.com/enisdenjo/graphql-ws) under the hood.

## ⚠️ Limitations

Seriously, **read this first** before you even think about using this.

<details>

<summary>This is in alpha</summary>

This is Alpha software and should be treated as such.

</details>

<details>

<summary>AWS API Gateway Limitations</summary>

There are a few noteworthy limitations to the AWS API Gateway WebSocket implementation.

> Note: If you work on AWS and want to run through this, hit me up!

#### Ping/Pong

For whatever reason, AWS API Gateway does not support WebSocket protocol level ping/pong.
Expand All @@ -74,7 +69,6 @@ API Gateway's current socket closing functionality doesn't support any kind of m

Because of this limitation, there is no clear way to communicate subprotocol errors to the client. In the case of a subprotocol error the socket will be closed by the server (with no meaningful disconnect payload).

</details>

## Setup

Expand Down
97 changes: 49 additions & 48 deletions lib/messages/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ describe('messages/subscribe', () => {
delete: [],
})
})

it('calls the global error callback server errors', async () => {
const event: any = { requestContext: { connectedAt: 1628889982819, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'b6o5BPxb3', requestId: 'MaEe0DVon', requestTimeEpoch: 1628889983319, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"abcdefg","type":"subscribe","payload":{"query":"{ hello }"}}' }
let error: any = null
Expand All @@ -108,59 +109,59 @@ describe('messages/subscribe', () => {
await subscribe({ server, event, message: JSON.parse(event.body) })
assert.match(error.message, /postToConnection Error/ )
})

describe('callbacks', () => {
// this test doesn't make sense anymore because these error are now sent as error messages
// it('fires onSubscribe before subscribing', async () => {
it('fires onSubscribe before subscribing', async () => {
const onSubscribe: string[] = []

// const onSubscribe: string[] = []
const typeDefs = `
type Query {
hello: String
}
type Subscription {
greetings: String
}
`
const resolvers = {
Query: {
hello: () => 'Hello World!',
},
Subscription: {
greetings:{
subscribe: pubsubSubscribe('greetings', {
onSubscribe() {
onSubscribe.push('We did it!')
throw new Error('don\'t subscribe!')
},
}),
resolve: ({ payload }) => {
return payload
},
},
},
}

// const typeDefs = `
// type Query {
// hello: String
// }
// type Subscription {
// greetings: String
// }
// `
// const resolvers = {
// Query: {
// hello: () => 'Hello World!',
// },
// Subscription: {
// greetings:{
// subscribe: pubsubSubscribe('greetings', {
// onSubscribe() {
// onSubscribe.push('We did it!')
// throw new Error('don\'t subscribe!')
// },
// }),
// resolve: ({ payload }) => {
// return payload
// },
// },
// },
// }
const schema = makeExecutableSchema({
typeDefs,
resolvers,
})
const server = await mockServerContext({
schema,
})
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }

// const schema = makeExecutableSchema({
// typeDefs,
// resolvers,
// })
// const server = await mockServerContext({
// schema,
// })
// const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }
await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
try {
await subscribe({ server, event, message: JSON.parse(event.body) })
throw new Error('should not have subscribed')
} catch (error) {
assert.equal(error.message, 'don\'t subscribe!')
}
assert.deepEqual(onSubscribe, ['We did it!'])
const subscriptions = await collect(server.mapper.query(server.model.Subscription, { connectionId: equals(event.requestContext.connectionId) }, { indexName: 'ConnectionIndex' }))
assert.isEmpty(subscriptions)
})

// await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
// try {
// await subscribe({ server, event, message: JSON.parse(event.body) })
// throw new Error('should not have subscribed')
// } catch (error) {
// assert.equal(error.message, 'don\'t subscribe!')
// }
// assert.deepEqual(onSubscribe, ['We did it!'])
// const subscriptions = await collect(server.mapper.query(server.model.Subscription, { connectionId: equals(event.requestContext.connectionId) }, { indexName: 'ConnectionIndex' }))
// assert.isEmpty(subscriptions)
// })
it('fires onAfterSubscribe after subscribing', async () => {
const events: string[] = []

Expand Down
13 changes: 6 additions & 7 deletions lib/messages/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SubscribeMessage, MessageType } from 'graphql-ws'
import { validate, parse, GraphQLError } from 'graphql'
import { validate, parse } from 'graphql'
import {
buildExecutionContext,
assertValidExecutionArguments,
Expand Down Expand Up @@ -83,17 +83,16 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve

const { topicDefinitions, onSubscribe, onAfterSubscribe } = field.subscribe as SubscribePseudoIterable<PubSubEvent>

try {
server.log('onSubscribe', { onSubscribe: !!onSubscribe })
await onSubscribe?.(root, args, context, info)
} catch (error) {
server.log('onSubscribe', { error })
server.log('onSubscribe', { onSubscribe: !!onSubscribe })
const onSubscribeErrors = await onSubscribe?.(root, args, context, info)
if (onSubscribeErrors){
server.log('onSubscribe', { onSubscribeErrors })
return sendMessage(server)({
...event.requestContext,
message: {
type: MessageType.Error,
id: message.id,
payload: [new GraphQLError(error.message)],
payload: onSubscribeErrors,
},
})
}
Expand Down
5 changes: 5 additions & 0 deletions lib/pubsub/getFilteredSubs-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ describe('collapseKeys', () => {
})
})
})

describe('getFilteredSubs', () => {
it('can match on payload')
it('can match on connectionId')
})
13 changes: 13 additions & 0 deletions lib/test/execute-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ const messageToString = (message) => {
export const executeQuery = async function* (query: string, {
url = URL,
stayConnected = false,
timeout = 20_000,
}: {
url?: string
stayConnected?: boolean
timeout?: number
} = {}): AsyncGenerator<unknown, void, unknown> {
let id = 1
const ws = new WebSocket(url, 'graphql-transport-ws')
Expand All @@ -40,6 +42,14 @@ export const executeQuery = async function* (query: string, {
incomingMessages.queueReturn()
})

let timer: NodeJS.Timeout|null = null
if (timeout) {
timer = setTimeout(() => {
incomingMessages.queueValue({ type: 'timeout', timeout })
incomingMessages.queueReturn()
}, timeout)
}

const send = (data: any) => new Promise<void>(resolve => ws.send(JSON.stringify(data), () => resolve()))

await new Promise(resolve => ws.on('open', resolve))
Expand All @@ -65,6 +75,9 @@ export const executeQuery = async function* (query: string, {
if (!stayConnected){
ws.close()
}
if (timer) {
clearTimeout(timer)
}
}


Expand Down
25 changes: 22 additions & 3 deletions lib/test/graphql-ws-schema.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import ws from 'ws'
import { useServer } from 'graphql-ws/lib/use/ws'
import { makeExecutableSchema } from '@graphql-tools/schema'
import { GraphQLError } from 'graphql'

const PORT = 4000

Expand All @@ -11,6 +12,7 @@ const typeDefs = `
type Subscription {
greetings: String
onSubscribeError: String
onResolveError: String
}
`

Expand All @@ -32,6 +34,14 @@ const resolvers = {
throw new Error('onSubscribeError')
},
},
onResolveError: {
subscribe: async function*(){
yield { greetings: 'yoyo' }
},
resolve() {
throw new Error('resolver error')
},
},
},
}

Expand Down Expand Up @@ -62,17 +72,26 @@ export const startGqlWSServer = async () => {
})

useServer(
{ schema },
{
schema,
async onSubscribe(ctx, message) {
if (message?.payload?.query === 'subscription { onSubscribeError }') {
return [
new GraphQLError('onSubscribeError'),
]
}
},
},
server,
)

await new Promise(resolve => server.on('listening', resolve))
// console.log('server started')

const close = () => new Promise<void>(resolve => server.close(() => resolve()))
const stop = () => new Promise<void>(resolve => server.close(() => resolve()))

return {
url: `ws://localhost:${PORT}`,
close,
stop,
}
}
Loading