From 9497dc36fdc47e9cbb09015e07623ca986845d2d Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Mon, 3 Mar 2025 17:59:37 -0400 Subject: [PATCH 01/17] connection params --- packages-v1/api-v1/trpc/routers/connection.ts | 83 ++++++++++++++++--- packages-v1/api-v1/trpc/routers/index.ts | 5 ++ 2 files changed, 78 insertions(+), 10 deletions(-) diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index 2315b2e26..6da70356c 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -1,27 +1,90 @@ +import {TRPCError} from '@trpc/server' import {z} from 'zod' +import {eq, schema} from '@openint/db' import {publicProcedure, router} from '../_base' +import {core} from '../../models' +import {zListParams} from './index' export const connectionRouter = router({ + getConnection: publicProcedure + // TODO: make zId('conn') + .input(z.object({id: z.string(), force_refresh: z.boolean().optional()})) + .output(core.connection) + .query(async ({ctx, input}) => { + const connection = await ctx.db.query.connection.findFirst({ + where: eq(schema.connection.id, input.id), + }) + if (!connection || !connection.connector_config_id) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: 'Connection not found', + }) + } + + const credentialsRequiresRefresh = + input.force_refresh || + connection.settings.oauth?.credentials?.expires_at + ? new Date(connection.settings.oauth.credentials.expires_at) < + new Date() + : false + + if (credentialsRequiresRefresh) { + // TODO: handle force_refresh + console.warn( + 'skipping credentialsRequiresRefresh', + credentialsRequiresRefresh, + ) + } + + return { + id: connection.id, + connector_name: connection.connector_name, + settings: connection.settings, + connector_config_id: connection.connector_config_id, + created_at: connection.created_at, + updated_at: connection.updated_at, + } + }), listConnections: publicProcedure .meta({ openapi: {method: 'GET', path: '/connection'}, }) - .input(z.void()) + .input( + zListParams.extend({ + connector_name: z.string().optional(), + force_refresh: z.boolean().optional(), + customer_id: z.string().optional(), + // TODO: make zId('ccfg').optional() + // but we get Type 'ZodOptional>' is missing the following properties from type 'ZodType': "~standard", "~validate" + connector_config_id: z.string().optional(), + }), + ) .output( z.object({ - // items: z.array(core.connection), - items: z.array( - z.object({ - id: z.string(), - connector_config_id: z.string().nullable(), - }), - ), + items: z.array(core.connection), }), ) - .query(async ({ctx}) => { + .query(async ({ctx, input}) => { const connections = await ctx.db.query.connection.findMany({ - with: {connector_config: {}}, + with: { + connector_config_id: input.connector_config_id, + customer_id: input.customer_id, + connector_name: input.connector_name, + }, + }) + const connectionsRequiringRefresh = connections.filter((c) => { + const credentialsExpired = c.settings.oauth?.credentials?.expires_at + ? new Date(c.settings.oauth.credentials.expires_at) < new Date() + : false + return input.force_refresh || credentialsExpired }) + if (connectionsRequiringRefresh.length > 0) { + // TODO: add refresh logic here + console.warn( + 'skipping connectionsRequiringRefresh', + connectionsRequiringRefresh, + ) + } return {items: connections} }), }) diff --git a/packages-v1/api-v1/trpc/routers/index.ts b/packages-v1/api-v1/trpc/routers/index.ts index 089584eb4..efa7653b2 100644 --- a/packages-v1/api-v1/trpc/routers/index.ts +++ b/packages-v1/api-v1/trpc/routers/index.ts @@ -18,6 +18,11 @@ const generalRouter = router({ .query(({ctx}) => ctx.viewer), }) +export const zListParams = z.object({ + limit: z.number().optional(), + offset: z.number().optional(), +}) + export const appRouter = trpc.mergeRouters( connectionRouter, connectorConfigRouter, From a8dc83dc71066b284de6f1250165367f9be68761 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Mon, 3 Mar 2025 18:58:43 -0400 Subject: [PATCH 02/17] implementing list connectin --- packages-v1/api-v1/trpc/routers/connection.ts | 142 +++++++++++++----- packages-v1/api-v1/trpc/routers/index.ts | 9 ++ 2 files changed, 111 insertions(+), 40 deletions(-) diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index 6da70356c..3499e13ba 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -1,14 +1,14 @@ import {TRPCError} from '@trpc/server' import {z} from 'zod' -import {eq, schema} from '@openint/db' +import {count, eq, schema, SQL} from '@openint/db' import {publicProcedure, router} from '../_base' import {core} from '../../models' -import {zListParams} from './index' +import {zListParams, zListResponse} from './index' export const connectionRouter = router({ getConnection: publicProcedure // TODO: make zId('conn') - .input(z.object({id: z.string(), force_refresh: z.boolean().optional()})) + .input(z.object({id: z.string()})) .output(core.connection) .query(async ({ctx, input}) => { const connection = await ctx.db.query.connection.findFirst({ @@ -21,21 +21,6 @@ export const connectionRouter = router({ }) } - const credentialsRequiresRefresh = - input.force_refresh || - connection.settings.oauth?.credentials?.expires_at - ? new Date(connection.settings.oauth.credentials.expires_at) < - new Date() - : false - - if (credentialsRequiresRefresh) { - // TODO: handle force_refresh - console.warn( - 'skipping credentialsRequiresRefresh', - credentialsRequiresRefresh, - ) - } - return { id: connection.id, connector_name: connection.connector_name, @@ -52,39 +37,116 @@ export const connectionRouter = router({ .input( zListParams.extend({ connector_name: z.string().optional(), - force_refresh: z.boolean().optional(), customer_id: z.string().optional(), // TODO: make zId('ccfg').optional() // but we get Type 'ZodOptional>' is missing the following properties from type 'ZodType': "~standard", "~validate" connector_config_id: z.string().optional(), }), ) - .output( + .output(zListResponse(core.connection)) + .query(async ({ctx, input}) => { + const limit = input.limit ?? 50 + const offset = input.offset ?? 0 + + const whereConditions: SQL[] = [] + + if (input.connector_config_id) { + whereConditions.push( + eq(schema.connection.connector_config_id, input.connector_config_id), + ) + } + if (input.customer_id) { + whereConditions.push( + eq(schema.connection.customer_id, input.customer_id), + ) + } + if (input.connector_name) { + whereConditions.push( + eq(schema.connection.connector_name, input.connector_name), + ) + } + + const whereClause = + whereConditions.length > 0 + ? whereConditions.reduce( + (acc, condition) => { + if (acc === true) return condition + return acc && condition + }, + true as boolean | SQL, + ) + : undefined + + // Use a single query with COUNT(*) OVER() to get both results and total count + const result = await ctx.db + .select({ + connection: schema.connection, + total: count(), + }) + .from(schema.connection) + .where(whereClause as SQL) + .limit(limit) + .offset(offset) + + const connections = result.map((r) => r.connection) + const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 + + return { + items: connections.map((conn) => ({ + id: conn.id, + connector_name: conn.connector_name, + settings: conn.settings, + connector_config_id: conn.connector_config_id!, + created_at: conn.created_at, + updated_at: conn.updated_at, + })), + total, + limit, + offset, + } + }), + checkConnection: publicProcedure + .meta({ + openapi: {method: 'POST', path: '/connection/{id}/check'}, + }) + .input( z.object({ - items: z.array(core.connection), + id: z.string(), + force_refresh: z.boolean().optional(), }), ) - .query(async ({ctx, input}) => { - const connections = await ctx.db.query.connection.findMany({ - with: { - connector_config_id: input.connector_config_id, - customer_id: input.customer_id, - connector_name: input.connector_name, - }, + .output(core.connection) + .mutation(async ({ctx, input}) => { + const connection = await ctx.db.query.connection.findFirst({ + where: eq(schema.connection.id, input.id), }) - const connectionsRequiringRefresh = connections.filter((c) => { - const credentialsExpired = c.settings.oauth?.credentials?.expires_at - ? new Date(c.settings.oauth.credentials.expires_at) < new Date() + if (!connection || !connection.connector_config_id) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: 'Connection not found', + }) + } + + const credentialsRequiresRefresh = + input.force_refresh || + connection.settings.oauth?.credentials?.expires_at + ? new Date(connection.settings.oauth.credentials.expires_at) < + new Date() : false - return input.force_refresh || credentialsExpired - }) - if (connectionsRequiringRefresh.length > 0) { - // TODO: add refresh logic here - console.warn( - 'skipping connectionsRequiringRefresh', - connectionsRequiringRefresh, - ) + + if (credentialsRequiresRefresh) { + // TODO: implement refresh logic here + console.warn('Connection requires refresh', credentialsRequiresRefresh) + // Add actual refresh implementation + } + + return { + id: connection.id, + connector_name: connection.connector_name, + settings: connection.settings, + connector_config_id: connection.connector_config_id, + created_at: connection.created_at, + updated_at: connection.updated_at, } - return {items: connections} }), }) diff --git a/packages-v1/api-v1/trpc/routers/index.ts b/packages-v1/api-v1/trpc/routers/index.ts index efa7653b2..32de42e64 100644 --- a/packages-v1/api-v1/trpc/routers/index.ts +++ b/packages-v1/api-v1/trpc/routers/index.ts @@ -23,6 +23,15 @@ export const zListParams = z.object({ offset: z.number().optional(), }) +export function zListResponse(itemSchema: T) { + return z.object({ + items: z.array(itemSchema), + total: z.number(), + limit: z.number(), + offset: z.number(), + }) +} + export const appRouter = trpc.mergeRouters( connectionRouter, connectorConfigRouter, From dfe4a7c75c39809337e3eea8f7e378faf7c332f9 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Mon, 3 Mar 2025 19:14:04 -0400 Subject: [PATCH 03/17] adding logo_url --- packages-v1/api-v1/trpc/routers/connection.ts | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index 3499e13ba..d3066eb23 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -1,10 +1,37 @@ import {TRPCError} from '@trpc/server' import {z} from 'zod' +import {defConnectors} from '@openint/all-connectors/connectors.def' import {count, eq, schema, SQL} from '@openint/db' import {publicProcedure, router} from '../_base' import {core} from '../../models' import {zListParams, zListResponse} from './index' +// TODO: don't make any +function formatConnection(connection: any) { + const connector = + defConnectors[connection.connector_name as keyof typeof defConnectors] + if (!connector) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `Connector not found for connection ${connection.id}`, + }) + } + + const logoUrl = + connector.metadata && + 'logoUrl' in connector.metadata && + connector.metadata.logoUrl?.startsWith('http') + ? connector.metadata.logoUrl + : connector.metadata && 'logoUrl' in connector.metadata + ? `https://cdn.jsdelivr.net/gh/openintegrations/openint@main/apps/web/public/${connector.metadata.logoUrl}` + : undefined + + return { + ...connection, + logo_url: logoUrl, + } +} + export const connectionRouter = router({ getConnection: publicProcedure // TODO: make zId('conn') @@ -21,14 +48,7 @@ export const connectionRouter = router({ }) } - return { - id: connection.id, - connector_name: connection.connector_name, - settings: connection.settings, - connector_config_id: connection.connector_config_id, - created_at: connection.created_at, - updated_at: connection.updated_at, - } + return formatConnection(connection) }), listConnections: publicProcedure .meta({ @@ -92,14 +112,7 @@ export const connectionRouter = router({ const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 return { - items: connections.map((conn) => ({ - id: conn.id, - connector_name: conn.connector_name, - settings: conn.settings, - connector_config_id: conn.connector_config_id!, - created_at: conn.created_at, - updated_at: conn.updated_at, - })), + items: connections.map((conn) => formatConnection(conn)), total, limit, offset, From e07dafbfa6f973f2108a22b4c02feb2f5ea7955b Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Mon, 3 Mar 2025 19:31:28 -0400 Subject: [PATCH 04/17] meta --- packages-v1/api-v1/trpc/routers/connection.ts | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index d3066eb23..2ab1cb43f 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -7,7 +7,7 @@ import {core} from '../../models' import {zListParams, zListResponse} from './index' // TODO: don't make any -function formatConnection(connection: any) { +function formatConnection(connection: any, include_secrets: boolean = false) { const connector = defConnectors[connection.connector_name as keyof typeof defConnectors] if (!connector) { @@ -23,19 +23,24 @@ function formatConnection(connection: any) { connector.metadata.logoUrl?.startsWith('http') ? connector.metadata.logoUrl : connector.metadata && 'logoUrl' in connector.metadata - ? `https://cdn.jsdelivr.net/gh/openintegrations/openint@main/apps/web/public/${connector.metadata.logoUrl}` + ? // TODO: replace this with our own custom domain + `https://cdn.jsdelivr.net/gh/openintegrations/openint@main/apps/web/public/${connector.metadata.logoUrl}` : undefined return { ...connection, logo_url: logoUrl, + ...(include_secrets ? {settings: connection.settings} : {}), } } export const connectionRouter = router({ getConnection: publicProcedure + .meta({ + openapi: {method: 'GET', path: '/connection/{id}'}, + }) // TODO: make zId('conn') - .input(z.object({id: z.string()})) + .input(z.object({id: z.string(), include_secrets: z.boolean().optional()})) .output(core.connection) .query(async ({ctx, input}) => { const connection = await ctx.db.query.connection.findFirst({ @@ -48,7 +53,7 @@ export const connectionRouter = router({ }) } - return formatConnection(connection) + return formatConnection(connection, input.include_secrets ?? false) }), listConnections: publicProcedure .meta({ @@ -61,6 +66,7 @@ export const connectionRouter = router({ // TODO: make zId('ccfg').optional() // but we get Type 'ZodOptional>' is missing the following properties from type 'ZodType': "~standard", "~validate" connector_config_id: z.string().optional(), + include_secrets: z.boolean().optional(), }), ) .output(zListResponse(core.connection)) @@ -112,7 +118,9 @@ export const connectionRouter = router({ const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 return { - items: connections.map((conn) => formatConnection(conn)), + items: connections.map((conn) => + formatConnection(conn, input.include_secrets ?? false), + ), total, limit, offset, @@ -126,6 +134,7 @@ export const connectionRouter = router({ z.object({ id: z.string(), force_refresh: z.boolean().optional(), + include_secrets: z.boolean().optional(), }), ) .output(core.connection) @@ -153,13 +162,6 @@ export const connectionRouter = router({ // Add actual refresh implementation } - return { - id: connection.id, - connector_name: connection.connector_name, - settings: connection.settings, - connector_config_id: connection.connector_config_id, - created_at: connection.created_at, - updated_at: connection.updated_at, - } + return formatConnection(connection, input.include_secrets ?? false) }), }) From d60b5d9b55f7d78334e8a9d792bf922f737cbdb9 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Mon, 3 Mar 2025 19:33:28 -0400 Subject: [PATCH 05/17] adding pagination to events --- packages-v1/api-v1/trpc/routers/event.ts | 65 +++++++++++++++--------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/packages-v1/api-v1/trpc/routers/event.ts b/packages-v1/api-v1/trpc/routers/event.ts index a72567885..bd037a13c 100644 --- a/packages-v1/api-v1/trpc/routers/event.ts +++ b/packages-v1/api-v1/trpc/routers/event.ts @@ -1,34 +1,51 @@ -import {z} from 'zod' -import {schema} from '@openint/db' +import {count, schema} from '@openint/db' +import {zListParams, zListResponse} from '.' import {publicProcedure, router} from '../_base' import {core} from '../../models' export const eventRouter = router({ - createEvent: publicProcedure - .meta({ - openapi: {method: 'POST', path: '/event'}, - }) - .input(core.event_insert) // Ref does not work for input params for now in zod-openapi. So will be inlined in the spec unfortunately - .output(core.event) - .mutation(async ({ctx, input}) => { - const [event] = await ctx.db - .insert(schema.event) - .values(input) - .returning() - return event! - }), + // NOTE: why publish this API? + // createEvent: publicProcedure + // .meta({ + // openapi: {method: 'POST', path: '/event'}, + // }) + // .input(core.event_insert) // Ref does not work for input params for now in zod-openapi. So will be inlined in the spec unfortunately + // .output(core.event) + // .mutation(async ({ctx, input}) => { + // const [event] = await ctx.db + // .insert(schema.event) + // .values(input) + // .returning() + // return event! + // }), listEvents: publicProcedure .meta({ openapi: {method: 'GET', path: '/event'}, }) - .input(z.void()) - .output( - z.object({ - items: z.array(core.event), - }), - ) - .query(async ({ctx}) => { - const events = await ctx.db.query.event.findMany({}) - return {items: events} + .input(zListParams) + .output(zListResponse(core.event)) + .query(async ({ctx, input}) => { + const limit = input.limit ?? 50 + const offset = input.offset ?? 0 + + // Use a single query with COUNT(*) OVER() to get both results and total count + const result = await ctx.db + .select({ + event: schema.event, + total: count(), + }) + .from(schema.event) + .limit(limit) + .offset(offset) + + const events = result.map((r) => r.event) + const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 + + return { + items: events, + total, + limit, + offset, + } }), }) From 53e98c3e329d02263e28081f00efc191d5f58889 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Mon, 3 Mar 2025 19:37:43 -0400 Subject: [PATCH 06/17] adding pagination to ccfg --- .../api-v1/trpc/routers/connectorConfig.ts | 62 +++++++++++++++---- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/packages-v1/api-v1/trpc/routers/connectorConfig.ts b/packages-v1/api-v1/trpc/routers/connectorConfig.ts index 6e6717bdf..8df10a188 100644 --- a/packages-v1/api-v1/trpc/routers/connectorConfig.ts +++ b/packages-v1/api-v1/trpc/routers/connectorConfig.ts @@ -1,23 +1,63 @@ import {z} from 'zod' +import {count, eq, schema, SQL} from '@openint/db' +import {zListParams, zListResponse} from '.' import {authenticatedProcedure, router} from '../_base' - -// import {core} from '../../models' +import {core} from '../../models' export const connectorConfigRouter = router({ listConnectorConfigs: authenticatedProcedure .meta({ openapi: {method: 'GET', path: '/connector-config'}, }) - .input(z.void()) - .output( - z.object({ - // TODO: Fix me by consolidating db and db-v1 finally - items: z.array(z.object({id: z.string(), org_id: z.string()})), + .input( + zListParams.extend({ + // TODO: make this a valid connector_name instead of string + connector_name: z.string().optional(), }), ) - .query(async ({ctx}) => { - const connectorConfigs = await ctx.db.query.connector_config.findMany({}) - console.log('connectorConfigs', connectorConfigs) - return {items: connectorConfigs} + .output(zListResponse(core.connector_config)) + .query(async ({ctx, input}) => { + const limit = input.limit ?? 50 + const offset = input.offset ?? 0 + + const whereConditions: SQL[] = [] + + if (input.connector_name) { + whereConditions.push( + eq(schema.connector_config.connector_name, input.connector_name), + ) + } + + const whereClause = + whereConditions.length > 0 + ? whereConditions.reduce( + (acc, condition) => { + if (acc === true) return condition + return acc && condition + }, + true as boolean | SQL, + ) + : undefined + + // Use a single query with COUNT(*) OVER() to get both results and total count + const result = await ctx.db + .select({ + connector_config: schema.connector_config, + total: count(), + }) + .from(schema.connector_config) + .where(whereClause as SQL) + .limit(limit) + .offset(offset) + + const connectorConfigs = result.map((r) => r.connector_config) + const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 + + return { + items: connectorConfigs, + total, + limit, + offset, + } }), }) From c23027eefd31e1447bd05b05b55c3a7fb67d118f Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Mon, 3 Mar 2025 19:41:40 -0400 Subject: [PATCH 07/17] adding order by --- packages-v1/api-v1/trpc/routers/connection.ts | 3 ++- packages-v1/api-v1/trpc/routers/connectorConfig.ts | 3 ++- packages-v1/api-v1/trpc/routers/event.ts | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index 2ab1cb43f..30ffc5011 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -1,7 +1,7 @@ import {TRPCError} from '@trpc/server' import {z} from 'zod' import {defConnectors} from '@openint/all-connectors/connectors.def' -import {count, eq, schema, SQL} from '@openint/db' +import {count, desc, eq, schema, SQL} from '@openint/db' import {publicProcedure, router} from '../_base' import {core} from '../../models' import {zListParams, zListResponse} from './index' @@ -111,6 +111,7 @@ export const connectionRouter = router({ }) .from(schema.connection) .where(whereClause as SQL) + .orderBy(desc(schema.connection.created_at)) .limit(limit) .offset(offset) diff --git a/packages-v1/api-v1/trpc/routers/connectorConfig.ts b/packages-v1/api-v1/trpc/routers/connectorConfig.ts index 8df10a188..d6cfbd285 100644 --- a/packages-v1/api-v1/trpc/routers/connectorConfig.ts +++ b/packages-v1/api-v1/trpc/routers/connectorConfig.ts @@ -1,5 +1,5 @@ import {z} from 'zod' -import {count, eq, schema, SQL} from '@openint/db' +import {count, desc, eq, schema, SQL} from '@openint/db' import {zListParams, zListResponse} from '.' import {authenticatedProcedure, router} from '../_base' import {core} from '../../models' @@ -47,6 +47,7 @@ export const connectorConfigRouter = router({ }) .from(schema.connector_config) .where(whereClause as SQL) + .orderBy(desc(schema.connector_config.created_at)) .limit(limit) .offset(offset) diff --git a/packages-v1/api-v1/trpc/routers/event.ts b/packages-v1/api-v1/trpc/routers/event.ts index bd037a13c..68b20428c 100644 --- a/packages-v1/api-v1/trpc/routers/event.ts +++ b/packages-v1/api-v1/trpc/routers/event.ts @@ -1,4 +1,4 @@ -import {count, schema} from '@openint/db' +import {count, desc, schema} from '@openint/db' import {zListParams, zListResponse} from '.' import {publicProcedure, router} from '../_base' import {core} from '../../models' @@ -35,6 +35,7 @@ export const eventRouter = router({ total: count(), }) .from(schema.event) + .orderBy(desc(schema.event.timestamp)) .limit(limit) .offset(offset) From 30dc9e5e06a38f96b96e273f602e73f78e8f7274 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Mon, 3 Mar 2025 19:46:22 -0400 Subject: [PATCH 08/17] type improvs --- packages-v1/api-v1/trpc/routers/connection.ts | 35 +++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index 30ffc5011..58e70832d 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -6,8 +6,10 @@ import {publicProcedure, router} from '../_base' import {core} from '../../models' import {zListParams, zListResponse} from './index' -// TODO: don't make any -function formatConnection(connection: any, include_secrets: boolean = false) { +function formatConnection( + connection: z.infer, + include_secrets: boolean = false, +) { const connector = defConnectors[connection.connector_name as keyof typeof defConnectors] if (!connector) { @@ -53,7 +55,11 @@ export const connectionRouter = router({ }) } - return formatConnection(connection, input.include_secrets ?? false) + return formatConnection( + // TODO: fix this + connection as any as z.infer, + input.include_secrets ?? false, + ) }), listConnections: publicProcedure .meta({ @@ -95,11 +101,10 @@ export const connectionRouter = router({ const whereClause = whereConditions.length > 0 ? whereConditions.reduce( - (acc, condition) => { - if (acc === true) return condition - return acc && condition + (acc, condition, index) => { + return index === 0 ? condition : acc && condition }, - true as boolean | SQL, + undefined as unknown as SQL, ) : undefined @@ -120,7 +125,11 @@ export const connectionRouter = router({ return { items: connections.map((conn) => - formatConnection(conn, input.include_secrets ?? false), + formatConnection( + // TODO: fix this + conn as any as z.infer, + input.include_secrets ?? false, + ), ), total, limit, @@ -152,10 +161,10 @@ export const connectionRouter = router({ const credentialsRequiresRefresh = input.force_refresh || - connection.settings.oauth?.credentials?.expires_at + (connection.settings.oauth?.credentials?.expires_at ? new Date(connection.settings.oauth.credentials.expires_at) < new Date() - : false + : false) if (credentialsRequiresRefresh) { // TODO: implement refresh logic here @@ -163,6 +172,10 @@ export const connectionRouter = router({ // Add actual refresh implementation } - return formatConnection(connection, input.include_secrets ?? false) + return formatConnection( + // TODO: fix this + connection as any as z.infer, + input.include_secrets ?? false, + ) }), }) From b61fe5d2ce0a73e02b8ebc387db839eb54960966 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Mon, 3 Mar 2025 20:09:46 -0400 Subject: [PATCH 09/17] making input optional --- packages-v1/api-v1/trpc/routers/connection.ts | 34 ++++++++++--------- .../api-v1/trpc/routers/connectorConfig.ts | 18 +++++----- packages-v1/api-v1/trpc/routers/event.ts | 6 ++-- 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index 58e70832d..21aa45f30 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -66,35 +66,37 @@ export const connectionRouter = router({ openapi: {method: 'GET', path: '/connection'}, }) .input( - zListParams.extend({ - connector_name: z.string().optional(), - customer_id: z.string().optional(), - // TODO: make zId('ccfg').optional() - // but we get Type 'ZodOptional>' is missing the following properties from type 'ZodType': "~standard", "~validate" - connector_config_id: z.string().optional(), - include_secrets: z.boolean().optional(), - }), + zListParams + .extend({ + connector_name: z.string().optional(), + customer_id: z.string().optional(), + // TODO: make zId('ccfg').optional() + // but we get Type 'ZodOptional>' is missing the following properties from type 'ZodType': "~standard", "~validate" + connector_config_id: z.string().optional(), + include_secrets: z.boolean().optional(), + }) + .optional(), ) .output(zListResponse(core.connection)) .query(async ({ctx, input}) => { - const limit = input.limit ?? 50 - const offset = input.offset ?? 0 + const limit = input?.limit ?? 50 + const offset = input?.offset ?? 0 const whereConditions: SQL[] = [] - if (input.connector_config_id) { + if (input?.connector_config_id) { whereConditions.push( eq(schema.connection.connector_config_id, input.connector_config_id), ) } - if (input.customer_id) { + if (input?.customer_id) { whereConditions.push( - eq(schema.connection.customer_id, input.customer_id), + eq(schema.connection.customer_id, input?.customer_id ?? ''), ) } - if (input.connector_name) { + if (input?.connector_name) { whereConditions.push( - eq(schema.connection.connector_name, input.connector_name), + eq(schema.connection.connector_name, input?.connector_name ?? ''), ) } @@ -128,7 +130,7 @@ export const connectionRouter = router({ formatConnection( // TODO: fix this conn as any as z.infer, - input.include_secrets ?? false, + input?.include_secrets, ), ), total, diff --git a/packages-v1/api-v1/trpc/routers/connectorConfig.ts b/packages-v1/api-v1/trpc/routers/connectorConfig.ts index d6cfbd285..871c18c76 100644 --- a/packages-v1/api-v1/trpc/routers/connectorConfig.ts +++ b/packages-v1/api-v1/trpc/routers/connectorConfig.ts @@ -10,21 +10,23 @@ export const connectorConfigRouter = router({ openapi: {method: 'GET', path: '/connector-config'}, }) .input( - zListParams.extend({ - // TODO: make this a valid connector_name instead of string - connector_name: z.string().optional(), - }), + zListParams + .extend({ + // TODO: make this a valid connector_name instead of string + connector_name: z.string().optional(), + }) + .optional(), ) .output(zListResponse(core.connector_config)) .query(async ({ctx, input}) => { - const limit = input.limit ?? 50 - const offset = input.offset ?? 0 + const limit = input?.limit ?? 50 + const offset = input?.offset ?? 0 const whereConditions: SQL[] = [] - if (input.connector_name) { + if (input?.connector_name) { whereConditions.push( - eq(schema.connector_config.connector_name, input.connector_name), + eq(schema.connector_config.connector_name, input?.connector_name), ) } diff --git a/packages-v1/api-v1/trpc/routers/event.ts b/packages-v1/api-v1/trpc/routers/event.ts index 68b20428c..48d4ab061 100644 --- a/packages-v1/api-v1/trpc/routers/event.ts +++ b/packages-v1/api-v1/trpc/routers/event.ts @@ -22,11 +22,11 @@ export const eventRouter = router({ .meta({ openapi: {method: 'GET', path: '/event'}, }) - .input(zListParams) + .input(zListParams.optional()) .output(zListResponse(core.event)) .query(async ({ctx, input}) => { - const limit = input.limit ?? 50 - const offset = input.offset ?? 0 + const limit = input?.limit ?? 50 + const offset = input?.offset ?? 0 // Use a single query with COUNT(*) OVER() to get both results and total count const result = await ctx.db From 666705878b49fb50e3919ceb6a6962d5a9f3853e Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Mon, 3 Mar 2025 20:11:37 -0400 Subject: [PATCH 10/17] note --- packages-v1/api-v1/trpc/routers/connection.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index 21aa45f30..f9f2e433e 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -33,6 +33,7 @@ function formatConnection( ...connection, logo_url: logoUrl, ...(include_secrets ? {settings: connection.settings} : {}), + // TODO: add display_name? } } From 4155ec6e3c315c6effb575feddff7ec7e8095bed Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Tue, 4 Mar 2025 13:55:44 +0100 Subject: [PATCH 11/17] Update packages-v1/api-v1/trpc/routers/connectorConfig.ts Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> --- packages-v1/api-v1/trpc/routers/connectorConfig.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages-v1/api-v1/trpc/routers/connectorConfig.ts b/packages-v1/api-v1/trpc/routers/connectorConfig.ts index 871c18c76..0b2badfd3 100644 --- a/packages-v1/api-v1/trpc/routers/connectorConfig.ts +++ b/packages-v1/api-v1/trpc/routers/connectorConfig.ts @@ -26,7 +26,7 @@ export const connectorConfigRouter = router({ if (input?.connector_name) { whereConditions.push( - eq(schema.connector_config.connector_name, input?.connector_name), + eq(schema.connector_config.connector_name, input.connector_name), ) } From 1a539671472f59bcea2754a88d21b06716001c73 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Tue, 4 Mar 2025 13:55:52 +0100 Subject: [PATCH 12/17] Update packages-v1/api-v1/trpc/routers/connection.ts Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> --- packages-v1/api-v1/trpc/routers/connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index f9f2e433e..928f69dc0 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -131,7 +131,7 @@ export const connectionRouter = router({ formatConnection( // TODO: fix this conn as any as z.infer, - input?.include_secrets, + input?.include_secrets ?? false, ), ), total, From df09aedf58da84fcf910f0c9bb85e02a1ba4a5a4 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Tue, 4 Mar 2025 12:32:47 -0400 Subject: [PATCH 13/17] adding connector utils --- .../api-v1/trpc/utils/connectorUtils.ts | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 packages-v1/api-v1/trpc/utils/connectorUtils.ts diff --git a/packages-v1/api-v1/trpc/utils/connectorUtils.ts b/packages-v1/api-v1/trpc/utils/connectorUtils.ts new file mode 100644 index 000000000..7fb66a171 --- /dev/null +++ b/packages-v1/api-v1/trpc/utils/connectorUtils.ts @@ -0,0 +1,44 @@ +import {TRPCError} from '@trpc/server' +import {z} from 'zod' +import {defConnectors} from '@openint/all-connectors/connectors.def' +import {core} from '../../models' + +export const zExpandOptions = z + .enum(['connector']) + .describe('Fields to expand: connector (includes connector details)') + +export async function expandConnector( + connectorConfig: z.infer, +) { + const connectorName = connectorConfig.connector_name + + const connector = defConnectors[connectorName as keyof typeof defConnectors] + if (!connector) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `Connector not found: ${connectorName}`, + }) + } + + const logoUrl = + connector.metadata && + 'logoUrl' in connector.metadata && + connector.metadata.logoUrl?.startsWith('http') + ? connector.metadata.logoUrl + : connector.metadata && 'logoUrl' in connector.metadata + ? // TODO: replace this with our own custom domain + `https://cdn.jsdelivr.net/gh/openintegrations/openint@main/apps/web/public/${connector.metadata.logoUrl}` + : undefined + + return { + // TODO: add more fields? + connector_name: connectorName, + // TODO: add display_name? + // display_name: connectorConfig.display_name, + // TODO: add enabled? + // enabled: connectorConfig.enabled, + created_at: connectorConfig.created_at, + updated_at: connectorConfig.updated_at, + logo_url: logoUrl, + } +} From f35c78dab5ce46784983bb1ed1d658cbcbf53a8a Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Tue, 4 Mar 2025 12:33:03 -0400 Subject: [PATCH 14/17] generalizing pagination processing --- packages-v1/api-v1/trpc/routers/index.ts | 44 ++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/packages-v1/api-v1/trpc/routers/index.ts b/packages-v1/api-v1/trpc/routers/index.ts index 32de42e64..d3818847f 100644 --- a/packages-v1/api-v1/trpc/routers/index.ts +++ b/packages-v1/api-v1/trpc/routers/index.ts @@ -1,5 +1,6 @@ import {z} from 'zod' import {zViewer} from '@openint/cdk' +import {Column, desc, schema} from '@openint/db' import {publicProcedure, router, trpc} from '../_base' import {connectionRouter} from './connection' import {connectorConfigRouter} from './connectorConfig' @@ -39,4 +40,47 @@ export const appRouter = trpc.mergeRouters( generalRouter, ) +export function applyPaginationAndOrder< + T extends {orderBy: Function; limit: Function; offset: Function}, + P extends {limit?: number; offset?: number} | undefined, +>( + query: T, + orderByColumn: Column = schema.connection.created_at, + params?: P, + orderDirection: 'asc' | 'desc' = 'desc', +): {query: T; limit: number; offset: number} { + // Process pagination parameters + const limit = params?.limit ?? 50 + const offset = params?.offset ?? 0 + + // Apply ordering + let modifiedQuery = query.orderBy( + orderDirection === 'desc' ? desc(orderByColumn) : orderByColumn, + ) as T + + // Apply pagination + modifiedQuery = modifiedQuery.limit(limit).offset(offset) as T + + return {query: modifiedQuery, limit, offset} +} + +export async function processPaginatedResponse( + query: any, + entityKey: T, +): Promise<{ + items: Array<(typeof schema)[T] extends {$inferSelect: infer U} ? U : never> + total: number +}> { + // note in future we can add db specific error handling here + const result = await query + const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 + + const items = result.map((r: any) => r[entityKey]) + + return { + items, + total, + } +} + export type AppRouter = typeof appRouter From b26f1c4856a13df374df7c4f90efdc43d8620718 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Tue, 4 Mar 2025 12:33:46 -0400 Subject: [PATCH 15/17] using and() for wheres, adding extend, include_secrets and refresh_policy params --- packages-v1/api-v1/trpc/routers/connection.ts | 272 ++++++++++++------ .../api-v1/trpc/routers/connectorConfig.ts | 79 ++--- packages-v1/api-v1/trpc/routers/event.ts | 37 +-- packages/db/index.ts | 1 + packages/db/schema/schema.ts | 1 + 5 files changed, 248 insertions(+), 142 deletions(-) diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index f9f2e433e..64a9e2371 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -1,14 +1,48 @@ import {TRPCError} from '@trpc/server' import {z} from 'zod' import {defConnectors} from '@openint/all-connectors/connectors.def' -import {count, desc, eq, schema, SQL} from '@openint/db' -import {publicProcedure, router} from '../_base' +import {serverConnectors} from '@openint/all-connectors/connectors.server' +import {and, count, eq, schema} from '@openint/db' +import {publicProcedure, router, RouterContext} from '../_base' import {core} from '../../models' -import {zListParams, zListResponse} from './index' +import {expandConnector} from '../utils/connectorUtils' +import { + applyPaginationAndOrder, + processPaginatedResponse, + zListParams, + zListResponse, +} from './index' -function formatConnection( +const zIncludeSecrets = z + .enum(['none', 'basic', 'all']) + .describe( + 'Controls secret inclusion: none (default), basic (auth only), or all secrets', + ) +const zRefreshPolicy = z + .enum(['none', 'force', 'auto']) + .describe( + 'Controls credential refresh: none (never), force (always), or auto (when expired, default)', + ) + +const zConnectionStatus = z + .enum(['healthy', 'disconnected', 'error', 'manual']) + .describe( + 'Connection status: healthy (all well), disconnected (needs reconnection), error (system issue), manual (import connection)', + ) + +const zConnectionError = z + .enum(['refresh_failed', 'unknown_external_error']) + .describe('Error types: refresh_failed and unknown_external_error') + +const zExpandOptions = z + .enum(['connector']) + .describe('Fields to expand: connector (includes connector details)') + +async function formatConnection( + ctx: RouterContext, connection: z.infer, - include_secrets: boolean = false, + include_secrets: z.infer = 'none', + expand: z.infer[] = [], ) { const connector = defConnectors[connection.connector_name as keyof typeof defConnectors] @@ -19,21 +53,42 @@ function formatConnection( }) } - const logoUrl = - connector.metadata && - 'logoUrl' in connector.metadata && - connector.metadata.logoUrl?.startsWith('http') - ? connector.metadata.logoUrl - : connector.metadata && 'logoUrl' in connector.metadata - ? // TODO: replace this with our own custom domain - `https://cdn.jsdelivr.net/gh/openintegrations/openint@main/apps/web/public/${connector.metadata.logoUrl}` - : undefined + // Handle different levels of secret inclusion + let settingsToInclude = {} + if (include_secrets === 'basic' && connection.settings.oauth) { + settingsToInclude = { + settings: { + ...connection.settings, + oauth: { + credentials: connection.settings.oauth.credentials, + }, + }, + } + } else if (include_secrets === 'all') { + settingsToInclude = {settings: connection.settings} + } + + let expandedFields = {} + if (expand.includes('connector')) { + const connectorConfig = await ctx.db.query.connector_config.findFirst({ + where: eq(schema.connector_config.id, connection.connector_config_id), + }) + if (!connectorConfig) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `Connector config not found: ${connection.connector_config_id}`, + }) + } + + expandedFields = { + connector: expandConnector(connectorConfig), + } + } return { ...connection, - logo_url: logoUrl, - ...(include_secrets ? {settings: connection.settings} : {}), - // TODO: add display_name? + ...settingsToInclude, + ...expandedFields, } } @@ -43,7 +98,14 @@ export const connectionRouter = router({ openapi: {method: 'GET', path: '/connection/{id}'}, }) // TODO: make zId('conn') - .input(z.object({id: z.string(), include_secrets: z.boolean().optional()})) + .input( + z.object({ + id: z.string(), + include_secrets: zIncludeSecrets.optional().default('none'), + refresh_policy: zRefreshPolicy.optional().default('auto'), + expand: z.array(zExpandOptions).optional().default([]), + }), + ) .output(core.connection) .query(async ({ctx, input}) => { const connection = await ctx.db.query.connection.findFirst({ @@ -56,10 +118,27 @@ export const connectionRouter = router({ }) } + const credentialsRequiresRefresh = + input.refresh_policy === 'force' || + (input.refresh_policy === 'auto' && + connection.settings.oauth?.credentials?.expires_at + ? new Date(connection.settings.oauth.credentials.expires_at) < + new Date() + : false) + + if (credentialsRequiresRefresh) { + console.warn( + 'Credentials require refresh for connection id, skipping: ' + + connection.id, + ) + } + return formatConnection( - // TODO: fix this + ctx, + // TODO: fix this any casting connection as any as z.infer, - input.include_secrets ?? false, + input.include_secrets, + input.expand, ) }), listConnections: publicProcedure @@ -72,66 +151,53 @@ export const connectionRouter = router({ connector_name: z.string().optional(), customer_id: z.string().optional(), // TODO: make zId('ccfg').optional() - // but we get Type 'ZodOptional>' is missing the following properties from type 'ZodType': "~standard", "~validate" connector_config_id: z.string().optional(), - include_secrets: z.boolean().optional(), + include_secrets: zIncludeSecrets.optional().default('none'), + expand: z.array(zExpandOptions).optional().default([]), }) .optional(), ) .output(zListResponse(core.connection)) .query(async ({ctx, input}) => { - const limit = input?.limit ?? 50 - const offset = input?.offset ?? 0 - - const whereConditions: SQL[] = [] - - if (input?.connector_config_id) { - whereConditions.push( - eq(schema.connection.connector_config_id, input.connector_config_id), - ) - } - if (input?.customer_id) { - whereConditions.push( - eq(schema.connection.customer_id, input?.customer_id ?? ''), - ) - } - if (input?.connector_name) { - whereConditions.push( - eq(schema.connection.connector_name, input?.connector_name ?? ''), - ) - } - - const whereClause = - whereConditions.length > 0 - ? whereConditions.reduce( - (acc, condition, index) => { - return index === 0 ? condition : acc && condition - }, - undefined as unknown as SQL, - ) - : undefined - - // Use a single query with COUNT(*) OVER() to get both results and total count - const result = await ctx.db - .select({ - connection: schema.connection, - total: count(), - }) - .from(schema.connection) - .where(whereClause as SQL) - .orderBy(desc(schema.connection.created_at)) - .limit(limit) - .offset(offset) + const {query, limit, offset} = applyPaginationAndOrder( + ctx.db + .select({ + connection: schema.connection, + total: count(), + }) + .from(schema.connection) + .where( + and( + input?.connector_config_id + ? eq( + schema.connection.connector_config_id, + input.connector_config_id, + ) + : undefined, + input?.customer_id + ? eq(schema.connection.customer_id, input.customer_id) + : undefined, + input?.connector_name + ? eq(schema.connection.connector_name, input.connector_name) + : undefined, + ), + ), + schema.connection.created_at, + input, + ) - const connections = result.map((r) => r.connection) - const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 + const {items, total} = await processPaginatedResponse(query, 'connection') return { - items: connections.map((conn) => - formatConnection( - // TODO: fix this - conn as any as z.infer, - input?.include_secrets, + items: await Promise.all( + items.map((conn) => + formatConnection( + ctx, + // @ts-ignore, QQ why is connector_config_id string|null in schema? + conn, + input?.include_secrets ?? 'none', + input?.expand ?? [], + ), ), ), total, @@ -145,12 +211,18 @@ export const connectionRouter = router({ }) .input( z.object({ + // TODO: make zId('conn') id: z.string(), - force_refresh: z.boolean().optional(), - include_secrets: z.boolean().optional(), }), ) - .output(core.connection) + .output( + z.object({ + id: z.string(), + status: zConnectionStatus, + error: zConnectionError.optional(), + errorMessage: z.string().optional(), + }), + ) .mutation(async ({ctx, input}) => { const connection = await ctx.db.query.connection.findFirst({ where: eq(schema.connection.id, input.id), @@ -162,12 +234,11 @@ export const connectionRouter = router({ }) } - const credentialsRequiresRefresh = - input.force_refresh || - (connection.settings.oauth?.credentials?.expires_at - ? new Date(connection.settings.oauth.credentials.expires_at) < - new Date() - : false) + const credentialsRequiresRefresh = connection.settings.oauth?.credentials + ?.expires_at + ? new Date(connection.settings.oauth.credentials.expires_at) < + new Date() + : false if (credentialsRequiresRefresh) { // TODO: implement refresh logic here @@ -175,10 +246,41 @@ export const connectionRouter = router({ // Add actual refresh implementation } - return formatConnection( - // TODO: fix this - connection as any as z.infer, - input.include_secrets ?? false, - ) + const connector = + serverConnectors[ + connection.connector_name as keyof typeof serverConnectors + ] + if (!connector) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `Connector not found for connection ${connection.id}`, + }) + } + + if ( + 'checkConnection' in connector && + typeof connector.checkConnection === 'function' + ) { + try { + await connector.checkConnection(connection.settings as any) + // QQ: should this parse the results of checkConnection somehow? + return { + id: connection.id, + status: 'healthy', + } + } catch (error) { + return { + id: connection.id, + status: 'disconnected', + error: 'unknown_external_error', + } + } + } + + // QQ: should we return healthy by default even if there's no check connection implemented? + return { + id: connection.id, + status: 'healthy', + } }), }) diff --git a/packages-v1/api-v1/trpc/routers/connectorConfig.ts b/packages-v1/api-v1/trpc/routers/connectorConfig.ts index 871c18c76..4df48be1b 100644 --- a/packages-v1/api-v1/trpc/routers/connectorConfig.ts +++ b/packages-v1/api-v1/trpc/routers/connectorConfig.ts @@ -1,8 +1,14 @@ import {z} from 'zod' -import {count, desc, eq, schema, SQL} from '@openint/db' -import {zListParams, zListResponse} from '.' +import {and, count, eq, schema} from '@openint/db' +import { + applyPaginationAndOrder, + processPaginatedResponse, + zListParams, + zListResponse, +} from '.' import {authenticatedProcedure, router} from '../_base' import {core} from '../../models' +import {expandConnector, zExpandOptions} from '../utils/connectorUtils' export const connectorConfigRouter = router({ listConnectorConfigs: authenticatedProcedure @@ -12,52 +18,47 @@ export const connectorConfigRouter = router({ .input( zListParams .extend({ - // TODO: make this a valid connector_name instead of string + expand: z.array(zExpandOptions).optional().default([]), connector_name: z.string().optional(), }) .optional(), ) .output(zListResponse(core.connector_config)) .query(async ({ctx, input}) => { - const limit = input?.limit ?? 50 - const offset = input?.offset ?? 0 + const {query, limit, offset} = applyPaginationAndOrder( + ctx.db + .select({ + connector_config: schema.connector_config, + total: count(), + }) + .from(schema.connector_config) + .where( + and( + input?.connector_name + ? eq( + schema.connector_config.connector_name, + input.connector_name, + ) + : undefined, + ), + ), + schema.connector_config.created_at, + input, + ) - const whereConditions: SQL[] = [] - - if (input?.connector_name) { - whereConditions.push( - eq(schema.connector_config.connector_name, input?.connector_name), - ) - } - - const whereClause = - whereConditions.length > 0 - ? whereConditions.reduce( - (acc, condition) => { - if (acc === true) return condition - return acc && condition - }, - true as boolean | SQL, - ) - : undefined - - // Use a single query with COUNT(*) OVER() to get both results and total count - const result = await ctx.db - .select({ - connector_config: schema.connector_config, - total: count(), - }) - .from(schema.connector_config) - .where(whereClause as SQL) - .orderBy(desc(schema.connector_config.created_at)) - .limit(limit) - .offset(offset) - - const connectorConfigs = result.map((r) => r.connector_config) - const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 + const {items, total} = await processPaginatedResponse( + query, + 'connector_config', + ) return { - items: connectorConfigs, + items: await Promise.all( + items.map(async (ccfg) => + input?.expand.includes('connector') + ? {...ccfg, connector: await expandConnector(ccfg)} + : ccfg, + ), + ), total, limit, offset, diff --git a/packages-v1/api-v1/trpc/routers/event.ts b/packages-v1/api-v1/trpc/routers/event.ts index 48d4ab061..2137b0347 100644 --- a/packages-v1/api-v1/trpc/routers/event.ts +++ b/packages-v1/api-v1/trpc/routers/event.ts @@ -1,5 +1,10 @@ -import {count, desc, schema} from '@openint/db' -import {zListParams, zListResponse} from '.' +import {count, schema} from '@openint/db' +import { + applyPaginationAndOrder, + processPaginatedResponse, + zListParams, + zListResponse, +} from '.' import {publicProcedure, router} from '../_base' import {core} from '../../models' @@ -25,25 +30,21 @@ export const eventRouter = router({ .input(zListParams.optional()) .output(zListResponse(core.event)) .query(async ({ctx, input}) => { - const limit = input?.limit ?? 50 - const offset = input?.offset ?? 0 + const {query, limit, offset} = applyPaginationAndOrder( + ctx.db + .select({ + event: schema.event, + total: count(), + }) + .from(schema.event), + schema.event.timestamp, + input, + ) - // Use a single query with COUNT(*) OVER() to get both results and total count - const result = await ctx.db - .select({ - event: schema.event, - total: count(), - }) - .from(schema.event) - .orderBy(desc(schema.event.timestamp)) - .limit(limit) - .offset(offset) - - const events = result.map((r) => r.event) - const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 + const {items, total} = await processPaginatedResponse(query, 'event') return { - items: events, + items, total, limit, offset, diff --git a/packages/db/index.ts b/packages/db/index.ts index 57521c113..25f506061 100644 --- a/packages/db/index.ts +++ b/packages/db/index.ts @@ -3,6 +3,7 @@ import {type Database} from './db' import * as schema from './schema/schema' export * from 'drizzle-orm' +export type {QueryBuilder} from 'drizzle-orm/pg-core' export * from './lib/stripeNullByte' export * from './lib/upsert' export {schema} diff --git a/packages/db/schema/schema.ts b/packages/db/schema/schema.ts index ce1c665fe..ce8816546 100644 --- a/packages/db/schema/schema.ts +++ b/packages/db/schema/schema.ts @@ -20,6 +20,7 @@ export const userRole = pgRole('authenticated') // TODO: Missing jwt_org_id() function definition export const connection = pgTable( + // NOTE: shouldn't this have a status or should it calculated based on settings? 'connection', { id: varchar() From afe36fe31605621631a41dceaa0c3c5a226dd509 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Tue, 4 Mar 2025 12:55:27 -0400 Subject: [PATCH 16/17] moving utils --- packages-v1/api-v1/trpc/routers/connection.ts | 2 +- .../api-v1/trpc/routers/connectorConfig.ts | 8 +-- packages-v1/api-v1/trpc/routers/event.ts | 6 +- packages-v1/api-v1/trpc/routers/index.ts | 58 ------------------- 4 files changed, 8 insertions(+), 66 deletions(-) diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index 64a9e2371..e0faf310f 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -11,7 +11,7 @@ import { processPaginatedResponse, zListParams, zListResponse, -} from './index' +} from '../utils/pagination' const zIncludeSecrets = z .enum(['none', 'basic', 'all']) diff --git a/packages-v1/api-v1/trpc/routers/connectorConfig.ts b/packages-v1/api-v1/trpc/routers/connectorConfig.ts index 4df48be1b..fabe6d29f 100644 --- a/packages-v1/api-v1/trpc/routers/connectorConfig.ts +++ b/packages-v1/api-v1/trpc/routers/connectorConfig.ts @@ -1,14 +1,14 @@ import {z} from 'zod' import {and, count, eq, schema} from '@openint/db' +import {authenticatedProcedure, router} from '../_base' +import {core} from '../../models' +import {expandConnector, zExpandOptions} from '../utils/connectorUtils' import { applyPaginationAndOrder, processPaginatedResponse, zListParams, zListResponse, -} from '.' -import {authenticatedProcedure, router} from '../_base' -import {core} from '../../models' -import {expandConnector, zExpandOptions} from '../utils/connectorUtils' +} from '../utils/pagination' export const connectorConfigRouter = router({ listConnectorConfigs: authenticatedProcedure diff --git a/packages-v1/api-v1/trpc/routers/event.ts b/packages-v1/api-v1/trpc/routers/event.ts index 2137b0347..f127ac5bb 100644 --- a/packages-v1/api-v1/trpc/routers/event.ts +++ b/packages-v1/api-v1/trpc/routers/event.ts @@ -1,12 +1,12 @@ import {count, schema} from '@openint/db' +import {publicProcedure, router} from '../_base' +import {core} from '../../models' import { applyPaginationAndOrder, processPaginatedResponse, zListParams, zListResponse, -} from '.' -import {publicProcedure, router} from '../_base' -import {core} from '../../models' +} from '../utils/pagination' export const eventRouter = router({ // NOTE: why publish this API? diff --git a/packages-v1/api-v1/trpc/routers/index.ts b/packages-v1/api-v1/trpc/routers/index.ts index d3818847f..089584eb4 100644 --- a/packages-v1/api-v1/trpc/routers/index.ts +++ b/packages-v1/api-v1/trpc/routers/index.ts @@ -1,6 +1,5 @@ import {z} from 'zod' import {zViewer} from '@openint/cdk' -import {Column, desc, schema} from '@openint/db' import {publicProcedure, router, trpc} from '../_base' import {connectionRouter} from './connection' import {connectorConfigRouter} from './connectorConfig' @@ -19,20 +18,6 @@ const generalRouter = router({ .query(({ctx}) => ctx.viewer), }) -export const zListParams = z.object({ - limit: z.number().optional(), - offset: z.number().optional(), -}) - -export function zListResponse(itemSchema: T) { - return z.object({ - items: z.array(itemSchema), - total: z.number(), - limit: z.number(), - offset: z.number(), - }) -} - export const appRouter = trpc.mergeRouters( connectionRouter, connectorConfigRouter, @@ -40,47 +25,4 @@ export const appRouter = trpc.mergeRouters( generalRouter, ) -export function applyPaginationAndOrder< - T extends {orderBy: Function; limit: Function; offset: Function}, - P extends {limit?: number; offset?: number} | undefined, ->( - query: T, - orderByColumn: Column = schema.connection.created_at, - params?: P, - orderDirection: 'asc' | 'desc' = 'desc', -): {query: T; limit: number; offset: number} { - // Process pagination parameters - const limit = params?.limit ?? 50 - const offset = params?.offset ?? 0 - - // Apply ordering - let modifiedQuery = query.orderBy( - orderDirection === 'desc' ? desc(orderByColumn) : orderByColumn, - ) as T - - // Apply pagination - modifiedQuery = modifiedQuery.limit(limit).offset(offset) as T - - return {query: modifiedQuery, limit, offset} -} - -export async function processPaginatedResponse( - query: any, - entityKey: T, -): Promise<{ - items: Array<(typeof schema)[T] extends {$inferSelect: infer U} ? U : never> - total: number -}> { - // note in future we can add db specific error handling here - const result = await query - const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 - - const items = result.map((r: any) => r[entityKey]) - - return { - items, - total, - } -} - export type AppRouter = typeof appRouter From 7199b96de7b70ca78b58cedf02ea13183b367a55 Mon Sep 17 00:00:00 2001 From: Amadeo Pellicce Date: Tue, 4 Mar 2025 12:56:48 -0400 Subject: [PATCH 17/17] adding pagination --- packages-v1/api-v1/trpc/utils/pagination.ts | 58 +++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 packages-v1/api-v1/trpc/utils/pagination.ts diff --git a/packages-v1/api-v1/trpc/utils/pagination.ts b/packages-v1/api-v1/trpc/utils/pagination.ts new file mode 100644 index 000000000..517445133 --- /dev/null +++ b/packages-v1/api-v1/trpc/utils/pagination.ts @@ -0,0 +1,58 @@ +import {z} from 'zod' +import {Column, desc, schema} from '@openint/db' + +export const zListParams = z.object({ + limit: z.number().optional(), + offset: z.number().optional(), +}) + +export function zListResponse(itemSchema: T) { + return z.object({ + items: z.array(itemSchema), + total: z.number(), + limit: z.number(), + offset: z.number(), + }) +} +export function applyPaginationAndOrder< + T extends {orderBy: Function; limit: Function; offset: Function}, + P extends {limit?: number; offset?: number} | undefined, +>( + query: T, + orderByColumn: Column = schema.connection.created_at, + params?: P, + orderDirection: 'asc' | 'desc' = 'desc', +): {query: T; limit: number; offset: number} { + // Process pagination parameters + const limit = params?.limit ?? 50 + const offset = params?.offset ?? 0 + + // Apply ordering + let modifiedQuery = query.orderBy( + orderDirection === 'desc' ? desc(orderByColumn) : orderByColumn, + ) as T + + // Apply pagination + modifiedQuery = modifiedQuery.limit(limit).offset(offset) as T + + return {query: modifiedQuery, limit, offset} +} + +export async function processPaginatedResponse( + query: any, + entityKey: T, +): Promise<{ + items: Array<(typeof schema)[T] extends {$inferSelect: infer U} ? U : never> + total: number +}> { + // note in future we can add db specific error handling here + const result = await query + const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 + + const items = result.map((r: any) => r[entityKey]) + + return { + items, + total, + } +}