Skip to content

Async Batcher #56423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/next/src/build/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import '../server/require-hook'
import '../server/node-polyfill-fetch'
import '../server/node-polyfill-crypto'
import '../server/node-environment'
import '../lib/polyfill-promise-with-resolvers'

import { green, yellow, red, cyan, bold, underline } from '../lib/picocolors'
import getGzipSize from 'next/dist/compiled/gzip-size'
Expand Down
1 change: 1 addition & 0 deletions packages/next/src/export/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type {
import '../server/node-polyfill-fetch'
import '../server/node-polyfill-web-streams'
import '../server/node-environment'
import '../lib/polyfill-promise-with-resolvers'

process.env.NEXT_IS_EXPORT_WORKER = 'true'

Expand Down
78 changes: 78 additions & 0 deletions packages/next/src/lib/batcher.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Batcher } from './batcher'

describe('Batcher', () => {
describe('batch', () => {
it('should execute the work function immediately', async () => {
const batcher = Batcher.create<string, number>()
const workFn = jest.fn().mockResolvedValue(42)

const result = await batcher.batch('key', workFn)

expect(result).toBe(42)
expect(workFn).toHaveBeenCalledTimes(1)
})

it('should batch multiple calls to the same key', async () => {
const batcher = Batcher.create<string, number>()
const workFn = jest.fn().mockResolvedValue(42)

const result1 = batcher.batch('key', workFn)
const result2 = batcher.batch('key', workFn)

expect(result1).toBeInstanceOf(Promise)
expect(result2).toBeInstanceOf(Promise)
expect(workFn).toHaveBeenCalledTimes(1)

const [value1, value2] = await Promise.all([result1, result2])

expect(value1).toBe(42)
expect(value2).toBe(42)
expect(workFn).toHaveBeenCalledTimes(1)
})

it('should not batch calls to different keys', async () => {
const batcher = Batcher.create<string, string>()
const workFn = jest.fn((key) => key)

const result1 = batcher.batch('key1', workFn)
const result2 = batcher.batch('key2', workFn)

expect(result1).toBeInstanceOf(Promise)
expect(result2).toBeInstanceOf(Promise)
expect(workFn).toHaveBeenCalledTimes(2)

const [value1, value2] = await Promise.all([result1, result2])

expect(value1).toBe('key1')
expect(value2).toBe('key2')
expect(workFn).toHaveBeenCalledTimes(2)
})

it('should use the cacheKeyFn to generate cache keys', async () => {
const cacheKeyFn = jest.fn().mockResolvedValue('cache-key')
const batcher = Batcher.create<string, number>({ cacheKeyFn })
const workFn = jest.fn().mockResolvedValue(42)

const result = await batcher.batch('key', workFn)

expect(result).toBe(42)
expect(cacheKeyFn).toHaveBeenCalledWith('key')
expect(workFn).toHaveBeenCalledTimes(1)
})

it('should use the schedulerFn to schedule work', async () => {
const schedulerFn = jest.fn().mockImplementation((fn) => fn())
const batcher = Batcher.create<string, number>({ schedulerFn })
const workFn = jest.fn().mockResolvedValue(42)

const results = await Promise.all([
batcher.batch('key', workFn),
batcher.batch('key', workFn),
batcher.batch('key', workFn),
])

expect(results).toEqual([42, 42, 42])
expect(workFn).toHaveBeenCalledTimes(1)
})
})
})
94 changes: 94 additions & 0 deletions packages/next/src/lib/batcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// This takes advantage of `Promise.withResolvers` which is polyfilled in
// this imported module.
import './polyfill-promise-with-resolvers'

import { SchedulerFn } from '../server/lib/schedule-on-next-tick'

type CacheKeyFn<K, C extends string | number | null> = (
key: K
) => PromiseLike<C> | C

type BatcherOptions<K, C extends string | number | null> = {
cacheKeyFn?: CacheKeyFn<K, C>
schedulerFn?: SchedulerFn<void>
}

type WorkFn<V, C> = (
key: C,
resolve: (value: V | PromiseLike<V>) => void
) => Promise<V>

/**
* A wrapper for a function that will only allow one call to the function to
* execute at a time.
*/
export class Batcher<K, V, C extends string | number | null> {
private readonly pending = new Map<C, Promise<V>>()

protected constructor(
private readonly cacheKeyFn?: CacheKeyFn<K, C>,
/**
* A function that will be called to schedule the wrapped function to be
* executed. This defaults to a function that will execute the function
* immediately.
*/
private readonly schedulerFn: SchedulerFn<void> = (fn) => fn()
) {}

/**
* Creates a new instance of PendingWrapper. If the key extends a string or
* number, the key will be used as the cache key. If the key is an object, a
* cache key function must be provided.
*/
public static create<K extends string | number | null, V>(
options?: BatcherOptions<K, K>
): Batcher<K, V, K>
public static create<K, V, C extends string | number | null>(
options: BatcherOptions<K, C> &
Required<Pick<BatcherOptions<K, C>, 'cacheKeyFn'>>
): Batcher<K, V, C>
public static create<K, V, C extends string | number | null>(
options?: BatcherOptions<K, C>
): Batcher<K, V, C> {
return new Batcher<K, V, C>(options?.cacheKeyFn, options?.schedulerFn)
}

/**
* Wraps a function in a promise that will be resolved or rejected only once
* for a given key. This will allow multiple calls to the function to be
* made, but only one will be executed at a time. The result of the first
* call will be returned to all callers.
*
* @param key the key to use for the cache
* @param fn the function to wrap
* @returns a promise that resolves to the result of the function
*/
public async batch(key: K, fn: WorkFn<V, C>): Promise<V> {
const cacheKey = (this.cacheKeyFn ? await this.cacheKeyFn(key) : key) as C
if (cacheKey === null) {
return fn(cacheKey, Promise.resolve)
}

const pending = this.pending.get(cacheKey)
if (pending) return pending

const { promise, resolve, reject } = Promise.withResolvers<V>()
this.pending.set(cacheKey, promise)

this.schedulerFn(async () => {
try {
const result = await fn(cacheKey, resolve)

// Resolving a promise multiple times is a no-op, so we can safely
// resolve all pending promises with the same result.
resolve(result)
} catch (err) {
reject(err)
} finally {
this.pending.delete(cacheKey)
}
})

return promise
}
}
27 changes: 27 additions & 0 deletions packages/next/src/lib/polyfill-promise-with-resolvers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// This adds a `Promise.withResolvers` polyfill. This will soon be adopted into
// the spec.
//
// TODO: remove this polyfill when it is adopted into the spec.
//
// https://tc39.es/proposal-promise-with-resolvers/
//
if (
!('withResolvers' in Promise) ||
typeof Promise.withResolvers !== 'function'
) {
Promise.withResolvers = <T>() => {
let resolvers: {
resolve: (value: T | PromiseLike<T>) => void
reject: (reason: any) => void
}

// Create the promise and assign the resolvers to the object.
const promise = new Promise<T>((resolve, reject) => {
resolvers = { resolve, reject }
})

// We know that resolvers is defined because the Promise constructor runs
// synchronously.
return { promise, resolve: resolvers!.resolve, reject: resolvers!.reject }
}
}
2 changes: 1 addition & 1 deletion packages/next/src/lib/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Worker as JestWorker } from 'next/dist/compiled/jest-worker'
import { getNodeOptionsWithoutInspect } from '../server/lib/utils'

// We need this as we're using `Promise.withResolvers` which is not available in the node typings
import '../server/node-environment'
import '../lib/polyfill-promise-with-resolvers'

type FarmOptions = ConstructorParameters<typeof JestWorker>[1]

Expand Down
71 changes: 29 additions & 42 deletions packages/next/src/server/dev/on-demand-entry-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import HotReloader from './hot-reloader-webpack'
import { isAppPageRouteDefinition } from '../future/route-definitions/app-page-route-definition'
import { scheduleOnNextTick } from '../lib/schedule-on-next-tick'
import { RouteDefinition } from '../future/route-definitions/route-definition'
import { Batcher } from '../../lib/batcher'

const debug = origDebug('next:on-demand-entry-handler')

Expand Down Expand Up @@ -878,8 +879,28 @@ export function onDemandEntryHandler({
}
}

type EnsurePageOptions = {
page: string
clientOnly: boolean
appPaths?: ReadonlyArray<string> | null
match?: RouteMatch
isApp?: boolean
}

// Make sure that we won't have multiple invalidations ongoing concurrently.
const curEnsurePage = new Map<string, Promise<void>>()
const batcher = Batcher.create<EnsurePageOptions, void, string>({
// The cache key here is composed of the elements that affect the
// compilation, namely, the page, whether it's client only, and whether
// it's an app page. This ensures that we don't have multiple compilations
// for the same page happening concurrently.
//
// We don't include the whole match because it contains match specific
// parameters (like route params) that would just bust this cache. Any
// details that would possibly bust the cache should be listed here.
cacheKeyFn: (options) => JSON.stringify(options),
// Schedule the invocation of the ensurePageImpl function on the next tick.
schedulerFn: scheduleOnNextTick,
})

return {
async ensurePage({
Expand All @@ -888,13 +909,7 @@ export function onDemandEntryHandler({
appPaths = null,
match,
isApp,
}: {
page: string
clientOnly: boolean
appPaths?: ReadonlyArray<string> | null
match?: RouteMatch
isApp?: boolean
}) {
}: EnsurePageOptions) {
// If the route is actually an app page route, then we should have access
// to the app route match, and therefore, the appPaths from it.
if (
Expand All @@ -905,43 +920,15 @@ export function onDemandEntryHandler({
appPaths = match.definition.appPaths
}

// The cache key here is composed of the elements that affect the
// compilation, namely, the page, whether it's client only, and whether
// it's an app page. This ensures that we don't have multiple compilations
// Wrap the invocation of the ensurePageImpl function in the pending
// wrapper, which will ensure that we don't have multiple compilations
// for the same page happening concurrently.
//
// We don't include the whole match because it contains match specific
// parameters (like route params) that would just bust this cache. Any
// details that would possibly bust the cache should be listed here.
const key = JSON.stringify({
page,
clientOnly,
appPaths,
definition: match?.definition,
isApp,
})

// See if we're already building this page.
const pending = curEnsurePage.get(key)
if (pending) return pending

const { promise, resolve, reject } = Promise.withResolvers<void>()
curEnsurePage.set(key, promise)

// Schedule the build to occur on the next tick, but don't wait and
// instead return the promise immediately.
scheduleOnNextTick(async () => {
try {
return batcher.batch(
{ page, clientOnly, appPaths, match, isApp },
async () => {
await ensurePageImpl({ page, clientOnly, appPaths, match, isApp })
resolve()
} catch (err) {
reject(err)
} finally {
curEnsurePage.delete(key)
}
})

return promise
)
},
onHMR(client: ws, getHmrServerError: () => Error | null) {
let bufferedHmrServerError: Error | null = null
Expand Down
1 change: 1 addition & 0 deletions packages/next/src/server/dev/static-paths-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { NextConfigComplete } from '../config-shared'
import '../require-hook'
import '../node-polyfill-fetch'
import '../node-environment'
import '../../lib/polyfill-promise-with-resolvers'

import {
buildAppStaticPaths,
Expand Down
1 change: 1 addition & 0 deletions packages/next/src/server/lib/router-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { WorkerRequestHandler, WorkerUpgradeHandler } from './types'
import '../node-polyfill-fetch'
import '../node-environment'
import '../require-hook'
import '../../lib/polyfill-promise-with-resolvers'

import url from 'url'
import path from 'path'
Expand Down
5 changes: 3 additions & 2 deletions packages/next/src/server/lib/schedule-on-next-tick.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
type ScheduledFn<T = void> = () => T | PromiseLike<T>
export type ScheduledFn<T = void> = () => T | PromiseLike<T>
export type SchedulerFn<T = void> = (cb: ScheduledFn<T>) => void

/**
* Schedules a function to be called on the next tick after the other promises
* have been resolved.
*/
export function scheduleOnNextTick<T = void>(cb: ScheduledFn<T>): void {
export const scheduleOnNextTick = <T = void>(cb: ScheduledFn<T>): void => {
// We use Promise.resolve().then() here so that the operation is scheduled at
// the end of the promise job queue, we then add it to the next process tick
// to ensure it's evaluated afterwards.
Expand Down
1 change: 1 addition & 0 deletions packages/next/src/server/next-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import './node-polyfill-fetch'
import './node-polyfill-form'
import './node-polyfill-web-streams'
import './node-polyfill-crypto'
import '../lib/polyfill-promise-with-resolvers'

import type { TLSSocket } from 'tls'
import {
Expand Down
28 changes: 0 additions & 28 deletions packages/next/src/server/node-environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,3 @@ if (typeof (globalThis as any).WebSocket !== 'function') {
},
})
}

// This adds a `Promise.withResolvers` polyfill. This will soon be adopted into
// the spec.
//
// TODO: remove this polyfill when it is adopted into the spec.
//
// https://tc39.es/proposal-promise-with-resolvers/
//
if (
!('withResolvers' in Promise) ||
typeof Promise.withResolvers !== 'function'
) {
Promise.withResolvers = <T>() => {
let resolvers: {
resolve: (value: T | PromiseLike<T>) => void
reject: (reason: any) => void
}

// Create the promise and assign the resolvers to the object.
const promise = new Promise<T>((resolve, reject) => {
resolvers = { resolve, reject }
})

// We know that resolvers is defined because the Promise constructor runs
// synchronously.
return { promise, resolve: resolvers!.resolve, reject: resolvers!.reject }
}
}
Loading