Skip to content

Latest commit

 

History

History
578 lines (422 loc) · 15.2 KB

README.md

File metadata and controls

578 lines (422 loc) · 15.2 KB

Graphql Lambda Subscriptions

Release

This is a fork of subscriptionless and is a Amazon Lambda Serverless equivalent to graphQL-ws. It follows the graphql-ws prototcol. It is tested with the Architect Sandbox against graphql-ws directly and run in production today. For many applications graphql-lambda-subscriptions should do what graphql-ws does for you today without having to run a server.

As subscriptionless's tagline goes;

Have all the functionality of GraphQL subscriptions on a stateful server without the cost.

Why a fork?

I had different requirements and needed more features. This project wouldn't exist without subscriptionless and you should totally check it out.

Features

  • Only needs DynamoDB, API Gateway and Lambda (no app sync or other platform required, can use step functions for ping/pong support)
  • Provides a Pub/Sub system to broadcast events to subscriptions
  • Provides hooks for the full lifecycle of a subscription
  • Type compatible with GraphQL and nexus.js

Quick Start

Since there are many ways to deploy to amazon lambda I'm going to have to get opinionated in the quickstart and pick Architect. graphql-lambda-subscriptions should work on Lambda regardless of your deployment and packaging framework. Take a look at the arc-basic-events mock used for integration testing for an example of using it with Architect.

More to come...

API

This should be generated...

subscribe(topic: string, options?: SubscribeOptions): SubscribePseudoIterable

Subscribe is the most important method in the library. It's the primary difference between graphql-ws and graphql-lambda-subscriptions. It returns a SubscribePseudoIterable that pretends to be an async iterator that you put on the subscribe resolver for your Subscription. In reality it includes a few properties that we use to subscribe to events and fire lifecycle functions.

interface SubscribeOptions {
    filter?: (...args: TSubscribeArgs) => MaybePromise<Partial<Payload>|void>;
    onSubscribe?: (...args: TSubscribeArgs) => MaybePromise<void>;
    onAfterSubscribe?: (...args: TSubscribeArgs) => MaybePromise<void>;
    onComplete?: (...args: TSubscribeArgs) => MaybePromise<void>;
}
  • topic: The you subscribe to the topic and can filter based upon the topics payload.
  • filter: An object that the payload will be matched against (or a function that produces the object). If the payload's field matches the subscription will receive the event. If the payload is missing the field the subscription will receive the event.
  • onSubscribe: A function that gets the subscription information (like arguments) it can return an array of errors if you don't want the subscription to subscribe.
  • onAfterSubscribe: A function that gets the subscription information (like arguments) and can fire initial events or record information.
  • onComplete: A function that fires at least once when a connection disconnects, a client sends a "complete" message, or the server sends a "complete" message. Because of the nature of aws lambda, it's possible for a client to send a "complete" message and disconnect and those events executing on lambda out of order. Which why this function can be called up to twice.

Old Readme

Ping/Pong

For whatever reason, AWS API Gateway does not support WebSocket protocol level ping/pong.

This means early detection of unclean client disconnects is near impossible (graphql-ws will not implement subprotocol level ping/pong).

Socket idleness

API Gateway considers an idle connection to be one where no messages have been sent on the socket for a fixed duration (currently 10 minutes).

Again, the WebSocket spec has support for detecting idle connections (ping/pong) but API Gateway doesn't use it. This means, in the case where both parties are connected, and no message is sent on the socket for the defined duration (direction agnostic), API Gateway will close the socket.

A quick fix for this is to set up immediate reconnection on the client side.

Socket errors

API Gateway's current socket closing functionality doesn't support any kind of message/payload. Along with this, graphql-ws won't support error messages.

Because of this limitation, there is no clear way to communicate subprotocol errors to the client. In the case of a subprotocol error the socket will be closed by the server (with no meaningful disconnect payload).

Setup

Create a subscriptionless instance.

import { createInstance } from 'subscriptionless';

const instance = createInstance({
  dynamodb,
  schema,
});

Export the handler.

export const handler = instance.handler;

Configure API Gateway

Set up API Gateway to route WebSocket events to the exported handler.

Serverless framework example.

functions:
  websocket:
    name: my-subscription-lambda
    handler: ./handler.handler
    events:
      - websocket:
          route: $connect
      - websocket:
          route: $disconnect
      - websocket:
          route: $default

Create DynanmoDB tables for state

In-flight connections and subscriptions need to be persisted.

📖 Changing DynamoDB table names

Use the tableNames argument to override the default table names.

const instance = createInstance({
  /* ... */
  tableNames: {
    connections: 'my_connections',
    subscriptions: 'my_subscriptions',
  },
});
💾 serverless framework example
resources:
  Resources:
    # Table for tracking connections
    connectionsTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${self:provider.environment.CONNECTIONS_TABLE}
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
    # Table for tracking subscriptions
    subscriptionsTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${self:provider.environment.SUBSCRIPTIONS_TABLE}
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
          - AttributeName: topic
            AttributeType: S
          - AttributeName: connectionId
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
          - AttributeName: topic
            KeyType: RANGE
        GlobalSecondaryIndexes:
          - IndexName: ConnectionIndex
            KeySchema:
              - AttributeName: connectionId
                KeyType: HASH
            Projection:
              ProjectionType: ALL
            ProvisionedThroughput:
              ReadCapacityUnits: 1
              WriteCapacityUnits: 1
          - IndexName: TopicIndex
            KeySchema:
              - AttributeName: topic
                KeyType: HASH
            Projection:
              ProjectionType: ALL
            ProvisionedThroughput:
              ReadCapacityUnits: 1
              WriteCapacityUnits: 1
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
💾 terraform example
resource "aws_dynamodb_table" "connections-table" {
  name           = "subscriptionless_connections"
  billing_mode   = "PROVISIONED"
  read_capacity  = 1
  write_capacity = 1
  hash_key = "id"

  attribute {
    name = "id"
    type = "S"
  }
}

resource "aws_dynamodb_table" "subscriptions-table" {
  name           = "subscriptionless_subscriptions"
  billing_mode   = "PROVISIONED"
  read_capacity  = 1
  write_capacity = 1
  hash_key = "id"
  range_key = "topic"

  attribute {
    name = "id"
    type = "S"
  }

  attribute {
    name = "topic"
    type = "S"
  }

  attribute {
    name = "connectionId"
    type = "S"
  }

  global_secondary_index {
    name               = "ConnectionIndex"
    hash_key           = "connectionId"
    write_capacity     = 1
    read_capacity      = 1
    projection_type    = "ALL"
  }

  global_secondary_index {
    name               = "TopicIndex"
    hash_key           = "topic"
    write_capacity     = 1
    read_capacity      = 1
    projection_type    = "ALL"
  }
}

Usage

PubSub

subscriptionless uses it's own PubSub implementation which loosely implements the Apollo PubSub Interface.

Note: Unlike the Apollo PubSub library, this implementation is (mostly) stateless

📖 Subscribing to topics

Use the subscribe function to associate incoming subscriptions with a topic.

import { subscribe } from 'subscriptionless/subscribe';

export const resolver = {
  Subscribe: {
    mySubscription: {
      resolve: (event, args, context) => {/* ... */}
      subscribe: subscribe('MY_TOPIC'),
    }
  }
}
📖 Filtering events

Wrap any subscribe function call in a withFilter to provide filter conditions.

Note: If a function is provided, it will be called on subscription start and must return a serializable object.

import { subscribe } from 'subscriptionless/subscribe';

// Subscription agnostic filter
withFilter(subscribe('MY_TOPIC'), {
  attr1: '`attr1` must have this value',
  attr2: {
    attr3: 'Nested attributes work fine',
  },
});

// Subscription specific filter
withFilter(subscribe('MY_TOPIC'), (root, args, context, info) => ({
  userId: args.userId,
}));
📖 Concatenating topic subscriptions

Join multiple topic subscriptions together using concat.

import { concat, subscribe } from 'subscriptionless/subscribe';

concat(subscribe('TOPIC_1'), subscribe('TOPIC_2'));
📖 Publishing events

Use the publish on your subscriptionless instance to publish events to active subscriptions.

instance.publish({
  type: 'MY_TOPIC',
  payload: 'HELLO',
});

Events can come from many sources

// SNS Event
export const snsHandler = (event) =>
  Promise.all(
    event.Records.map((r) =>
      instance.publish({
        topic: r.Sns.TopicArn.substring(r.Sns.TopicArn.lastIndexOf(':') + 1), // Get topic name (e.g. "MY_TOPIC")
        payload: JSON.parse(r.Sns.Message),
      })
    )
  );

// Manual Invocation
export const invocationHandler = (payload) =>
  instance.publish({ topic: 'MY_TOPIC', payload });

Context

Context values are accessible in all resolver level functions (resolve, subscribe, onSubscribe and onComplete).

📖 Default value

Assuming no context argument is provided, the default value is an object containing a connectionParams attribute.

This attribute contains the (optionally parsed) payload from connection_init.

export const resolver = {
  Subscribe: {
    mySubscription: {
      resolve: (event, args, context) => {
        console.log(context.connectionParams); // payload from connection_init
      },
    },
  },
};
📖 Setting static context value

An object can be provided via the context attribute when calling createInstance.

const instance = createInstance({
  /* ... */
  context: {
    myAttr: 'hello',
  },
});

The default values (above) will be appended to this object prior to execution.

📖 Setting dynamic context value

A function (optionally async) can be provided via the context attribute when calling createInstance.

The default context value is passed as an argument.

const instance = createInstance({
  /* ... */
  context: ({ connectionParams }) => ({
    myAttr: 'hello',
    user: connectionParams.user,
  }),
});

Side effects

Side effect handlers can be declared on subscription fields to handle onSubscribe (start) and onComplete (stop) events.

📖 Adding side-effect handlers
export const resolver = {
  Subscribe: {
    mySubscription: {
      resolve: (event, args, context) => {
        /* ... */
      },
      subscribe: subscribe('MY_TOPIC', {
        // filter?: object | ((...args: SubscribeArgs) => object)
        // onSubscribe?: (...args: SubscribeArgs) => void | Promise<void>
        // onComplete?: (...args: SubscribeArgs) => void | Promise<void>
        // onAfterSubscribe?: (...args: SubscribeArgs) => PubSubEvent | Promise<PubSubEvent> | undefined | Promise<undefined>
      }),
    },
  },
};

Events

Global events can be provided when calling createInstance to track the execution cycle of the lambda.

📖 Connect (onConnect)

Called when a WebSocket connection is first established.

const instance = createInstance({
  /* ... */
  onConnect: ({ event }) => {
    /* */
  },
});
📖 Disconnect (onDisconnect)

Called when a WebSocket connection is disconnected.

const instance = createInstance({
  /* ... */
  onDisconnect: ({ event }) => {
    /* */
  },
});
📖 Authorization (connection_init)

onConnectionInit can be used to verify the connection_init payload prior to persistence.

Note: Any sensitive data in the incoming message should be removed at this stage.

const instance = createInstance({
  /* ... */
  onConnectionInit: ({ message }) => {
    const token = message.payload.token;

    if (!myValidation(token)) {
      throw Error('Token validation failed');
    }

    // Prevent sensitive data from being written to DB
    return {
      ...message.payload,
      token: undefined,
    };
  },
});

By default, the (optionally parsed) payload will be accessible via context.

📖 Subscribe (onSubscribe)

Subscribe (onSubscribe)

Called when any subscription message is received.

const instance = createInstance({
  /* ... */
  onSubscribe: ({ event, message }) => {
    /* */
  },
});
📖 Complete (onComplete)

Called when any complete message is received.

const instance = createInstance({
  /* ... */
  onComplete: ({ event, message }) => {
    /* */
  },
});
📖 Error (onError)

Called when any error is encountered

const instance = createInstance({
  /* ... */
  onError: (error, context) => {
    /* */
  },
});