diff --git a/apps/web/app/api/cron/links/webhooks-created/route.ts b/apps/web/app/api/cron/links/webhooks-created/route.ts new file mode 100644 index 0000000000..a45bc0dd12 --- /dev/null +++ b/apps/web/app/api/cron/links/webhooks-created/route.ts @@ -0,0 +1,100 @@ +import { handleAndReturnErrorResponse } from "@/lib/api/errors"; +import { linkCache } from "@/lib/api/links/cache"; +import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; +import { isLinkLevelWebhook } from "@/lib/webhook/utils"; +import { prisma } from "@dub/prisma"; +import { z } from "zod"; + +export const dynamic = "force-dynamic"; + +const schema = z.object({ + webhookId: z.string(), +}); + +// This route is used to sync webhooks with the links cache +// This will be called only for link-level webhooks +export async function POST(req: Request) { + try { + const rawBody = await req.text(); + await verifyQstashSignature({ req, rawBody }); + + const { webhookId } = schema.parse(JSON.parse(rawBody)); + + const webhook = await prisma.webhook.findUnique({ + where: { + id: webhookId, + }, + }); + + if (!webhook) { + return new Response("Webhook not found."); + } + + if (!isLinkLevelWebhook(webhook)) { + return new Response("Webhook is not a link-level webhook."); + } + + const linksCount = await prisma.linkWebhook.groupBy({ + by: ["enabled"], + where: { + webhookId, + }, + _count: true, + }); + + const includedLinksCount = + linksCount.find((link) => link.enabled)?._count || 0; + const excludedLinksCount = + linksCount.find((link) => !link.enabled)?._count || 0; + + // Include specific links + if (includedLinksCount > 0) { + let skip = 0; + const take = 1000; + + while (true) { + const linkWebhooks = await prisma.linkWebhook.findMany({ + where: { + webhookId, + enabled: true, + }, + skip, + take, + }); + + if (linkWebhooks.length === 0) { + break; + } + + const links = await prisma.link.findMany({ + where: { + id: { + in: linkWebhooks.map((link) => link.linkId), + }, + }, + include: { + tags: { + include: { + tag: true, + }, + }, + webhooks: true, + }, + }); + + await linkCache.mset(links); + + skip += take; + } + } + + // Exclude specific links + if (excludedLinksCount > 0) { + // + } + + return new Response("OK"); + } catch (error) { + return handleAndReturnErrorResponse(error); + } +} diff --git a/apps/web/app/api/cron/webhooks/deleted/route.ts b/apps/web/app/api/cron/webhooks/deleted/route.ts new file mode 100644 index 0000000000..6cd2e1822e --- /dev/null +++ b/apps/web/app/api/cron/webhooks/deleted/route.ts @@ -0,0 +1,85 @@ +import { handleAndReturnErrorResponse } from "@/lib/api/errors"; +import { linkCache } from "@/lib/api/links/cache"; +import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; +import { isLinkLevelWebhook } from "@/lib/webhook/utils"; +import { prisma } from "@dub/prisma"; +import { z } from "zod"; + +export const dynamic = "force-dynamic"; + +const schema = z.object({ + webhookId: z.string(), +}); + +// This route is used to sync webhooks with the links cache when a webhook is deleted +export async function POST(req: Request) { + try { + const rawBody = await req.text(); + await verifyQstashSignature({ req, rawBody }); + + const { webhookId } = schema.parse(JSON.parse(rawBody)); + + const linksCount = await prisma.linkWebhook.groupBy({ + by: ["enabled"], + where: { + webhookId, + }, + _count: true, + }); + + const includedLinksCount = + linksCount.find((link) => link.enabled)?._count || 0; + const excludedLinksCount = + linksCount.find((link) => !link.enabled)?._count || 0; + + // Include specific links + if (includedLinksCount > 0) { + let skip = 0; + const take = 1000; + + while (true) { + const linkWebhooks = await prisma.linkWebhook.findMany({ + where: { + webhookId, + enabled: true, + }, + skip, + take, + }); + + if (linkWebhooks.length === 0) { + break; + } + + const links = await prisma.link.findMany({ + where: { + id: { + in: linkWebhooks.map((link) => link.linkId), + }, + }, + include: { + tags: { + include: { + tag: true, + }, + }, + webhooks: true, + }, + }); + + await linkCache.mset(links); + + skip += take; + } + } + + // Exclude specific links + if (excludedLinksCount > 0) { + // + } + + return new Response("OK"); + } catch (error) { + return handleAndReturnErrorResponse(error); + } +} diff --git a/apps/web/app/api/webhooks/[webhookId]/route.ts b/apps/web/app/api/webhooks/[webhookId]/route.ts index 9e79b951bf..1655d61b70 100644 --- a/apps/web/app/api/webhooks/[webhookId]/route.ts +++ b/apps/web/app/api/webhooks/[webhookId]/route.ts @@ -272,15 +272,6 @@ export const DELETE = withWorkspace( }); } - const linkWebhooks = await prisma.linkWebhook.findMany({ - where: { - webhookId, - }, - select: { - linkId: true, - }, - }); - await prisma.webhook.delete({ where: { id: webhookId, @@ -288,28 +279,16 @@ export const DELETE = withWorkspace( }); waitUntil( - (async () => { - const links = await prisma.link.findMany({ - where: { - id: { in: linkWebhooks.map(({ linkId }) => linkId) }, - }, - include: { - webhooks: { - select: { - webhookId: true, - }, - }, - }, - }); + Promise.all([ + toggleWebhooksForWorkspace({ + workspaceId: workspace.id, + }), - await Promise.all([ - toggleWebhooksForWorkspace({ - workspaceId: workspace.id, - }), - linkCache.mset(links), - webhookCache.delete(webhookId), - ]); - })(), + webhookCache.delete(webhookId), + + // TODO: + // Propagate webhook trigger changes to the links cache if the webhook is a "link-level" webhook + ]), ); return NextResponse.json({ diff --git a/apps/web/app/api/webhooks/route.ts b/apps/web/app/api/webhooks/route.ts index d836b67e06..a796c8d0a5 100644 --- a/apps/web/app/api/webhooks/route.ts +++ b/apps/web/app/api/webhooks/route.ts @@ -1,16 +1,11 @@ import { DubApiError } from "@/lib/api/errors"; -import { linkCache } from "@/lib/api/links/cache"; import { parseRequestBody } from "@/lib/api/utils"; import { withWorkspace } from "@/lib/auth"; import { getFolders } from "@/lib/folder/get-folders"; -import { webhookCache } from "@/lib/webhook/cache"; import { createWebhook } from "@/lib/webhook/create-webhook"; import { transformWebhook } from "@/lib/webhook/transform"; import { toggleWebhooksForWorkspace } from "@/lib/webhook/update-webhook"; -import { - identifyWebhookReceiver, - isLinkLevelWebhook, -} from "@/lib/webhook/utils"; +import { identifyWebhookReceiver } from "@/lib/webhook/utils"; import { createWebhookSchema } from "@/lib/zod/schemas/webhooks"; import { sendEmail } from "@dub/email"; import { WebhookAdded } from "@dub/email/templates/webhook-added"; @@ -61,9 +56,8 @@ export const GET = withWorkspace( // POST /api/webhooks/ - create a new webhook export const POST = withWorkspace( async ({ req, workspace, session }) => { - const { name, url, triggers, linkIds, secret } = createWebhookSchema.parse( - await parseRequestBody(req), - ); + const { name, url, triggers, linkIds, excludeLinkIds, secret } = + createWebhookSchema.parse(await parseRequestBody(req)); const existingWebhook = await prisma.webhook.findFirst({ where: { @@ -79,7 +73,9 @@ export const POST = withWorkspace( }); } - if (linkIds && linkIds.length > 0) { + const allLinkIds = [...(linkIds || []), ...(excludeLinkIds || [])]; + + if (allLinkIds.length > 0) { const folders = await getFolders({ workspaceId: workspace.id, userId: session.user.id, @@ -88,7 +84,7 @@ export const POST = withWorkspace( const links = await prisma.link.findMany({ where: { id: { - in: linkIds, + in: allLinkIds, }, projectId: workspace.id, OR: [ @@ -101,7 +97,7 @@ export const POST = withWorkspace( }, }); - if (links.length !== linkIds.length) { + if (links.length !== allLinkIds.length) { throw new DubApiError({ code: "bad_request", message: @@ -132,6 +128,7 @@ export const POST = withWorkspace( receiver: isZapierWebhook ? WebhookReceiver.zapier : WebhookReceiver.user, triggers, linkIds, + excludeLinkIds, secret, workspace, installationId: zapierInstallation ? zapierInstallation.id : undefined, @@ -145,44 +142,26 @@ export const POST = withWorkspace( } waitUntil( - (async () => { - const links = await prisma.link.findMany({ - where: { - id: { in: linkIds }, - projectId: workspace.id, - }, - include: { - webhooks: { - select: { - webhookId: true, - }, - }, - }, - }); - - Promise.allSettled([ - toggleWebhooksForWorkspace({ - workspaceId: workspace.id, - }), - sendEmail({ + Promise.all([ + toggleWebhooksForWorkspace({ + workspaceId: workspace.id, + }), + + sendEmail({ + email: session.user.email, + subject: "New webhook added", + react: WebhookAdded({ email: session.user.email, - subject: "New webhook added", - react: WebhookAdded({ - email: session.user.email, - workspace: { - name: workspace.name, - slug: workspace.slug, - }, - webhook: { - name, - }, - }), + workspace: { + name: workspace.name, + slug: workspace.slug, + }, + webhook: { + name, + }, }), - ...(links && links.length > 0 ? [linkCache.mset(links), []] : []), - - ...(isLinkLevelWebhook(webhook) ? [webhookCache.set(webhook)] : []), - ]); - })(), + }), + ]), ); return NextResponse.json(transformWebhook(webhook), { status: 201 }); diff --git a/apps/web/lib/webhook/create-webhook.ts b/apps/web/lib/webhook/create-webhook.ts index 33242681eb..3243c9f8c2 100644 --- a/apps/web/lib/webhook/create-webhook.ts +++ b/apps/web/lib/webhook/create-webhook.ts @@ -1,5 +1,4 @@ import { createId } from "@/lib/api/create-id"; -import { linkCache } from "@/lib/api/links/cache"; import { webhookCache } from "@/lib/webhook/cache"; import { WEBHOOK_ID_PREFIX } from "@/lib/webhook/constants"; import { isLinkLevelWebhook } from "@/lib/webhook/utils"; @@ -16,6 +15,7 @@ export async function createWebhook({ secret, triggers, linkIds, + excludeLinkIds, workspace, receiver, installationId, @@ -40,12 +40,16 @@ export async function createWebhook({ projectId: workspace.id, secret: secret || createWebhookSecret(), links: { - ...(linkIds && - linkIds.length > 0 && { - create: linkIds.map((linkId) => ({ - linkId, - })), - }), + create: [ + ...(linkIds?.map((linkId) => ({ + linkId, + enabled: true, + })) || []), + ...(excludeLinkIds?.map((linkId) => ({ + linkId, + enabled: false, + })) || []), + ], }, }, select: { @@ -71,34 +75,9 @@ export async function createWebhook({ waitUntil( (async () => { - const links = await prisma.link.findMany({ - where: { - id: { in: linkIds }, - projectId: workspace.id, - }, - include: { - webhooks: { - select: { - webhookId: true, - }, - }, - }, - }); - - const formatedLinks = links.map((link) => { - return { - ...link, - webhookIds: link.webhooks.map((webhook) => webhook.webhookId), - }; - }); - - Promise.all([ - ...(links && links.length > 0 - ? [linkCache.mset(formatedLinks), []] - : []), - - ...(isLinkLevelWebhook(webhook) ? [webhookCache.set(webhook)] : []), - ]); + if (isLinkLevelWebhook(webhook)) { + await webhookCache.set(webhook); + } })(), ); diff --git a/apps/web/lib/zod/schemas/webhooks.ts b/apps/web/lib/zod/schemas/webhooks.ts index 38fe9fdbc0..dcffeeee03 100644 --- a/apps/web/lib/zod/schemas/webhooks.ts +++ b/apps/web/lib/zod/schemas/webhooks.ts @@ -10,6 +10,7 @@ export const WebhookSchema = z.object({ triggers: z.array(z.enum(WEBHOOK_TRIGGERS)), disabledAt: z.date().nullable(), linkIds: z.array(z.string()).optional(), + excludeLinkIds: z.array(z.string()).optional(), installationId: z.string().nullable(), }); @@ -18,7 +19,18 @@ export const createWebhookSchema = z.object({ url: parseUrlSchema, secret: z.string().optional(), triggers: z.array(z.enum(WEBHOOK_TRIGGERS)), - linkIds: z.array(z.string()).optional(), + linkIds: z + .array(z.string()) + .optional() + .describe( + "Link IDs to include. If not provided, all links will be included.", + ), + excludeLinkIds: z + .array(z.string()) + .optional() + .describe( + "Link IDs to exclude. If not provided, all links will be included.", + ), }); export const updateWebhookSchema = createWebhookSchema.partial(); diff --git a/packages/prisma/schema/webhook.prisma b/packages/prisma/schema/webhook.prisma index e1b345e9a5..bd993d6339 100644 --- a/packages/prisma/schema/webhook.prisma +++ b/packages/prisma/schema/webhook.prisma @@ -29,12 +29,13 @@ model Webhook { } model LinkWebhook { - id String @id @default(cuid()) + id String @id @default(cuid()) linkId String webhookId String + enabled Boolean @default(true) link Link @relation(fields: [linkId], references: [id], onUpdate: Cascade, onDelete: Cascade) - webhook Webhook @relation(fields: [webhookId], references: [id], onUpdate: Cascade, onDelete: Cascade) + webhook Webhook @relation(fields: [webhookId], references: [id], onUpdate: Cascade) @@unique([linkId, webhookId]) @@index(webhookId)