Skip to content

feat(previews): retry errored previews if feature flag enabled #4498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion packages/server/modules/previews/domain/operations.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,46 @@
import type { ObjectPreview } from '@/modules/previews/domain/types'
import type { Nullable, Optional, PartialBy } from '@speckle/shared'
import type {
MaybeNullOrUndefined,
Nullable,
Optional,
PartialBy
} from '@speckle/shared'
import type { Request, Response } from 'express'
import { PreviewStatus } from '@/modules/previews/domain/consts'

export type GetObjectPreviewInfo = (params: {
streamId: string
objectId: string
}) => Promise<Optional<ObjectPreview>>

export type GetPaginatedObjectPreviewsInErrorState = (params: {
limit: number
cursor?: MaybeNullOrUndefined<string>
}) => Promise<{
totalCount: number
items: ObjectPreview[]
cursor: string | null
}>

export type PaginatedObjectPreviewsParams = {
limit: number
cursor?: MaybeNullOrUndefined<string>
filter?: MaybeNullOrUndefined<{
status: (typeof PreviewStatus)[keyof typeof PreviewStatus]
}>
}

export type GetPaginatedObjectPreviewsPage = (
params: PaginatedObjectPreviewsParams
) => Promise<{
items: ObjectPreview[]
cursor: string | null
}>

export type GetPaginatedObjectPreviewsTotalCount = (
params: Omit<PaginatedObjectPreviewsParams, 'limit' | 'cursor'>
) => Promise<number>

export type CreateObjectPreview = (
params: Pick<ObjectPreview, 'streamId' | 'objectId' | 'priority'>
) => Promise<boolean>
Expand Down
131 changes: 128 additions & 3 deletions packages/server/modules/previews/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
/* istanbul ignore file */
import cron from 'node-cron'
import { moduleLogger, previewLogger as logger } from '@/observability/logging'
import { consumePreviewResultFactory } from '@/modules/previews/resultListener'

import { db } from '@/db/knex'
import {
disablePreviews,
getFeatureFlags,
getPreviewServiceRedisUrl,
getPrivateObjectsServerOrigin,
getRedisUrl,
getServerOrigin
getServerOrigin,
previewServiceShouldUsePrivateObjectsServerUrl
} from '@/modules/shared/helpers/envHelper'
import Bull, { type QueueOptions } from 'bull'
import Bull, { type Queue, type QueueOptions } from 'bull'
import Redis, { type RedisOptions } from 'ioredis'
import { createBullBoard } from 'bull-board'
import { BullMQAdapter } from 'bull-board/bullMQAdapter'
Expand All @@ -22,6 +26,8 @@ import type { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { previewResultPayload } from '@speckle/shared/dist/commonjs/previews/job.js'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import {
getPaginatedObjectPreviewsPageFactory,
getPaginatedObjectPreviewsTotalCountFactory,
storePreviewFactory,
upsertObjectPreviewFactory
} from '@/modules/previews/repository/previews'
Expand All @@ -30,7 +36,103 @@ import {
initializeMetrics,
PreviewJobDurationStep
} from '@/modules/previews/observability/metrics'
import { addRequestQueueListeners } from '@/modules/previews/queues/previews'
import {
addRequestQueueListeners,
requestObjectPreviewFactory
} from '@/modules/previews/queues/previews'
import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler'
import {
acquireTaskLockFactory,
releaseTaskLockFactory
} from '@/modules/core/repositories/scheduledTasks'
import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations'
import { getRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector'
import {
getPaginatedObjectPreviewInErrorStateFactory,
retryFailedPreviewsFactory
} from '@/modules/previews/services/tasks'
import { getStreamCollaboratorsFactory } from '@/modules/core/repositories/streams'
import { createAppTokenFactory } from '@/modules/core/services/tokens'
import {
storeApiTokenFactory,
storeTokenResourceAccessDefinitionsFactory,
storeTokenScopesFactory,
storeUserServerAppTokenFactory
} from '@/modules/core/repositories/tokens'

const { FF_RETRY_ERRORED_PREVIEWS_ENABLED } = getFeatureFlags()

let scheduledTasks: cron.ScheduledTask[] = []

const scheduleRetryFailedPreviews = async ({
scheduleExecution,
previewRequestQueue,
responseQueueName
}: {
scheduleExecution: ScheduleExecution
previewRequestQueue: Queue
responseQueueName: string
}) => {
let previewResurrectionHandlers: {
handler: ReturnType<typeof retryFailedPreviewsFactory>
cursor: string | null
}[] = []
const regionClients = await getRegisteredDbClients()
for (const projectDb of [db, ...regionClients]) {
previewResurrectionHandlers.push({
handler: retryFailedPreviewsFactory({
getPaginatedObjectPreviewsInErrorState:
getPaginatedObjectPreviewInErrorStateFactory({
getPaginatedObjectPreviewsPage: getPaginatedObjectPreviewsPageFactory({
db: projectDb
}),
getPaginatedObjectPreviewsTotalCount:
getPaginatedObjectPreviewsTotalCountFactory({
db: projectDb
})
}),
upsertObjectPreview: upsertObjectPreviewFactory({
db: projectDb
}),
requestObjectPreview: requestObjectPreviewFactory({
queue: previewRequestQueue,
responseQueue: responseQueueName
}),
serverOrigin: previewServiceShouldUsePrivateObjectsServerUrl()
? getPrivateObjectsServerOrigin()
: getServerOrigin(),
getStreamCollaborators: getStreamCollaboratorsFactory({ db }),
createAppToken: createAppTokenFactory({
storeApiToken: storeApiTokenFactory({ db }),
storeTokenScopes: storeTokenScopesFactory({ db }),
storeTokenResourceAccessDefinitions:
storeTokenResourceAccessDefinitionsFactory({
db
}),
storeUserServerAppToken: storeUserServerAppTokenFactory({ db })
})
}),
cursor: null
})
}

const cronExpression = '*/5 * * * *' // every 5 minutes
return scheduleExecution(
cronExpression,
'PreviewResurrection',
async (_scheduledTime, { logger }) => {
previewResurrectionHandlers = await Promise.all(
previewResurrectionHandlers.map(async ({ handler, cursor }) => {
const newCursor = await handler({
logger,
previousCursor: cursor
})
return { handler, cursor: newCursor.cursor }
})
)
}
)
}
import { isRedisReady } from '@/modules/shared/redis/redis'

const JobQueueName = 'preview-service-jobs'
Expand Down Expand Up @@ -99,6 +201,11 @@ export const init: SpeckleModule['init'] = async ({
moduleLogger.info('📸 Init object preview module')
}

const scheduleExecution = scheduleExecutionFactory({
acquireTaskLock: acquireTaskLockFactory({ db }),
releaseTaskLock: releaseTaskLockFactory({ db })
})

const responseQueueName = `${ResponseQueueNamePrefix}-${
new URL(getServerOrigin()).hostname
}`
Expand All @@ -119,6 +226,18 @@ export const init: SpeckleModule['init'] = async ({
return
}

scheduledTasks = [
...(FF_RETRY_ERRORED_PREVIEWS_ENABLED
? [
await scheduleRetryFailedPreviews({
scheduleExecution,
previewRequestQueue,
responseQueueName
})
]
: [])
]

const { previewJobsProcessedSummary } = initializeMetrics({
registers: [metricsRegister],
previewRequestQueue,
Expand Down Expand Up @@ -216,3 +335,9 @@ export const init: SpeckleModule['init'] = async ({
}

export const finalize = () => {}

export const shutdown: SpeckleModule['shutdown'] = async () => {
scheduledTasks.forEach((task) => {
task.stop()
})
}
48 changes: 48 additions & 0 deletions packages/server/modules/previews/repository/previews.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { buildTableHelper } from '@/modules/core/dbSchema'
import {
GetObjectPreviewInfo,
GetPaginatedObjectPreviewsPage,
GetPaginatedObjectPreviewsTotalCount,
GetPreviewImage,
PaginatedObjectPreviewsParams,
StoreObjectPreview,
StorePreview,
UpsertObjectPreview
Expand All @@ -13,6 +16,7 @@ import {
import { Knex } from 'knex'
import { SetOptional } from 'type-fest'
import { PreviewStatus } from '@/modules/previews/domain/consts'
import { decodeCursor, encodeCursor } from '@/modules/shared/helpers/graphqlHelper'

const ObjectPreview = buildTableHelper('object_preview', [
'streamId',
Expand All @@ -39,6 +43,50 @@ export const getObjectPreviewInfoFactory =
.first()
}

export const getPaginatedObjectsPreviewsBaseQueryFactory =
(deps: { db: Knex }) =>
(params: Omit<PaginatedObjectPreviewsParams, 'limit' | 'cursor'>) => {
const query = tables.objectPreview(deps.db).select('*')

if (params.filter?.status) {
query.where('previewStatus', params.filter.status)
}

return query
}

export const getPaginatedObjectPreviewsPageFactory =
(deps: { db: Knex }): GetPaginatedObjectPreviewsPage =>
async (params) => {
const { limit, cursor } = params
const query = getPaginatedObjectsPreviewsBaseQueryFactory(deps)(params)
.orderBy('lastUpdate', 'desc') //newest first
.limit(limit)

if (cursor) {
query.where('lastUpdate', '<', decodeCursor(cursor)) //everything older than the cursor
}

const items = await query

return {
items,
cursor: items.length
? encodeCursor(items[items.length - 1].lastUpdate.toISOString())
: null
}
}

export const getPaginatedObjectPreviewsTotalCountFactory =
(deps: { db: Knex }): GetPaginatedObjectPreviewsTotalCount =>
async (params) => {
const baseQ = getPaginatedObjectsPreviewsBaseQueryFactory(deps)(params)
const q = deps.db.count<{ count: string }[]>().from(baseQ.as('sq1'))
const [row] = await q

return parseInt(row.count || '0')
}

/**
* @throws {Error} if the preview already exists
*/
Expand Down
Loading