Skip to content

Move link.clicked trigger to the workspace level. #2276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions apps/web/app/api/cron/links/webhooks-created/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { handleAndReturnErrorResponse } from "@/lib/api/errors";
import { linkCache } from "@/lib/api/links/cache";
import { verifyQstashSignature } from "@/lib/cron/verify-qstash";
import { isLinkLevelWebhook } from "@/lib/webhook/utils";
import { prisma } from "@dub/prisma";
import { z } from "zod";

export const dynamic = "force-dynamic";

const schema = z.object({
webhookId: z.string(),
});

// This route is used to sync webhooks with the links cache
// This will be called only for link-level webhooks
export async function POST(req: Request) {
try {
const rawBody = await req.text();
await verifyQstashSignature({ req, rawBody });

const { webhookId } = schema.parse(JSON.parse(rawBody));

const webhook = await prisma.webhook.findUnique({
where: {
id: webhookId,
},
});

if (!webhook) {
return new Response("Webhook not found.");
}

if (!isLinkLevelWebhook(webhook)) {
return new Response("Webhook is not a link-level webhook.");
}

const linksCount = await prisma.linkWebhook.groupBy({
by: ["enabled"],
where: {
webhookId,
},
_count: true,
});

const includedLinksCount =
linksCount.find((link) => link.enabled)?._count || 0;
const excludedLinksCount =
linksCount.find((link) => !link.enabled)?._count || 0;

// Include specific links
if (includedLinksCount > 0) {
let skip = 0;
const take = 1000;

while (true) {
const linkWebhooks = await prisma.linkWebhook.findMany({
where: {
webhookId,
enabled: true,
},
skip,
take,
});

if (linkWebhooks.length === 0) {
break;
}

const links = await prisma.link.findMany({
where: {
id: {
in: linkWebhooks.map((link) => link.linkId),
},
},
include: {
tags: {
include: {
tag: true,
},
},
webhooks: true,
},
});

await linkCache.mset(links);

skip += take;
}
}

// Exclude specific links
if (excludedLinksCount > 0) {
//
}

return new Response("OK");
} catch (error) {
return handleAndReturnErrorResponse(error);
}
}
85 changes: 85 additions & 0 deletions apps/web/app/api/cron/webhooks/deleted/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { handleAndReturnErrorResponse } from "@/lib/api/errors";
import { linkCache } from "@/lib/api/links/cache";
import { verifyQstashSignature } from "@/lib/cron/verify-qstash";
import { isLinkLevelWebhook } from "@/lib/webhook/utils";
import { prisma } from "@dub/prisma";
import { z } from "zod";

export const dynamic = "force-dynamic";

const schema = z.object({
webhookId: z.string(),
});

// This route is used to sync webhooks with the links cache when a webhook is deleted
export async function POST(req: Request) {
try {
const rawBody = await req.text();
await verifyQstashSignature({ req, rawBody });

const { webhookId } = schema.parse(JSON.parse(rawBody));

const linksCount = await prisma.linkWebhook.groupBy({
by: ["enabled"],
where: {
webhookId,
},
_count: true,
});

const includedLinksCount =
linksCount.find((link) => link.enabled)?._count || 0;
const excludedLinksCount =
linksCount.find((link) => !link.enabled)?._count || 0;

// Include specific links
if (includedLinksCount > 0) {
let skip = 0;
const take = 1000;

while (true) {
const linkWebhooks = await prisma.linkWebhook.findMany({
where: {
webhookId,
enabled: true,
},
skip,
take,
});

if (linkWebhooks.length === 0) {
break;
}

const links = await prisma.link.findMany({
where: {
id: {
in: linkWebhooks.map((link) => link.linkId),
},
},
include: {
tags: {
include: {
tag: true,
},
},
webhooks: true,
},
});

await linkCache.mset(links);

skip += take;
}
}

// Exclude specific links
if (excludedLinksCount > 0) {
//
}

return new Response("OK");
} catch (error) {
return handleAndReturnErrorResponse(error);
}
}
39 changes: 9 additions & 30 deletions apps/web/app/api/webhooks/[webhookId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,44 +272,23 @@ export const DELETE = withWorkspace(
});
}

const linkWebhooks = await prisma.linkWebhook.findMany({
where: {
webhookId,
},
select: {
linkId: true,
},
});

await prisma.webhook.delete({
where: {
id: webhookId,
},
});

waitUntil(
(async () => {
const links = await prisma.link.findMany({
where: {
id: { in: linkWebhooks.map(({ linkId }) => linkId) },
},
include: {
webhooks: {
select: {
webhookId: true,
},
},
},
});
Promise.all([
toggleWebhooksForWorkspace({
workspaceId: workspace.id,
}),

await Promise.all([
toggleWebhooksForWorkspace({
workspaceId: workspace.id,
}),
linkCache.mset(links),
webhookCache.delete(webhookId),
]);
})(),
webhookCache.delete(webhookId),

// TODO:
// Propagate webhook trigger changes to the links cache if the webhook is a "link-level" webhook
]),
);

return NextResponse.json({
Expand Down
75 changes: 27 additions & 48 deletions apps/web/app/api/webhooks/route.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import { DubApiError } from "@/lib/api/errors";
import { linkCache } from "@/lib/api/links/cache";
import { parseRequestBody } from "@/lib/api/utils";
import { withWorkspace } from "@/lib/auth";
import { getFolders } from "@/lib/folder/get-folders";
import { webhookCache } from "@/lib/webhook/cache";
import { createWebhook } from "@/lib/webhook/create-webhook";
import { transformWebhook } from "@/lib/webhook/transform";
import { toggleWebhooksForWorkspace } from "@/lib/webhook/update-webhook";
import {
identifyWebhookReceiver,
isLinkLevelWebhook,
} from "@/lib/webhook/utils";
import { identifyWebhookReceiver } from "@/lib/webhook/utils";
import { createWebhookSchema } from "@/lib/zod/schemas/webhooks";
import { sendEmail } from "@dub/email";
import { WebhookAdded } from "@dub/email/templates/webhook-added";
Expand Down Expand Up @@ -61,9 +56,8 @@ export const GET = withWorkspace(
// POST /api/webhooks/ - create a new webhook
export const POST = withWorkspace(
async ({ req, workspace, session }) => {
const { name, url, triggers, linkIds, secret } = createWebhookSchema.parse(
await parseRequestBody(req),
);
const { name, url, triggers, linkIds, excludeLinkIds, secret } =
createWebhookSchema.parse(await parseRequestBody(req));

const existingWebhook = await prisma.webhook.findFirst({
where: {
Expand All @@ -79,7 +73,9 @@ export const POST = withWorkspace(
});
}

if (linkIds && linkIds.length > 0) {
const allLinkIds = [...(linkIds || []), ...(excludeLinkIds || [])];

if (allLinkIds.length > 0) {
const folders = await getFolders({
workspaceId: workspace.id,
userId: session.user.id,
Expand All @@ -88,7 +84,7 @@ export const POST = withWorkspace(
const links = await prisma.link.findMany({
where: {
id: {
in: linkIds,
in: allLinkIds,
},
projectId: workspace.id,
OR: [
Expand All @@ -101,7 +97,7 @@ export const POST = withWorkspace(
},
});

if (links.length !== linkIds.length) {
if (links.length !== allLinkIds.length) {
throw new DubApiError({
code: "bad_request",
message:
Expand Down Expand Up @@ -132,6 +128,7 @@ export const POST = withWorkspace(
receiver: isZapierWebhook ? WebhookReceiver.zapier : WebhookReceiver.user,
triggers,
linkIds,
excludeLinkIds,
secret,
workspace,
installationId: zapierInstallation ? zapierInstallation.id : undefined,
Expand All @@ -145,44 +142,26 @@ export const POST = withWorkspace(
}

waitUntil(
(async () => {
const links = await prisma.link.findMany({
where: {
id: { in: linkIds },
projectId: workspace.id,
},
include: {
webhooks: {
select: {
webhookId: true,
},
},
},
});

Promise.allSettled([
toggleWebhooksForWorkspace({
workspaceId: workspace.id,
}),
sendEmail({
Promise.all([
toggleWebhooksForWorkspace({
workspaceId: workspace.id,
}),

sendEmail({
email: session.user.email,
subject: "New webhook added",
react: WebhookAdded({
email: session.user.email,
subject: "New webhook added",
react: WebhookAdded({
email: session.user.email,
workspace: {
name: workspace.name,
slug: workspace.slug,
},
webhook: {
name,
},
}),
workspace: {
name: workspace.name,
slug: workspace.slug,
},
webhook: {
name,
},
}),
...(links && links.length > 0 ? [linkCache.mset(links), []] : []),

...(isLinkLevelWebhook(webhook) ? [webhookCache.set(webhook)] : []),
]);
})(),
}),
]),
);

return NextResponse.json(transformWebhook(webhook), { status: 201 });
Expand Down
Loading
Loading