diff --git a/packages-v1/api-v1/trpc/routers/connection.ts b/packages-v1/api-v1/trpc/routers/connection.ts index 2315b2e26..e0faf310f 100644 --- a/packages-v1/api-v1/trpc/routers/connection.ts +++ b/packages-v1/api-v1/trpc/routers/connection.ts @@ -1,27 +1,286 @@ +import {TRPCError} from '@trpc/server' import {z} from 'zod' -import {publicProcedure, router} from '../_base' +import {defConnectors} from '@openint/all-connectors/connectors.def' +import {serverConnectors} from '@openint/all-connectors/connectors.server' +import {and, count, eq, schema} from '@openint/db' +import {publicProcedure, router, RouterContext} from '../_base' +import {core} from '../../models' +import {expandConnector} from '../utils/connectorUtils' +import { + applyPaginationAndOrder, + processPaginatedResponse, + zListParams, + zListResponse, +} from '../utils/pagination' + +const zIncludeSecrets = z + .enum(['none', 'basic', 'all']) + .describe( + 'Controls secret inclusion: none (default), basic (auth only), or all secrets', + ) +const zRefreshPolicy = z + .enum(['none', 'force', 'auto']) + .describe( + 'Controls credential refresh: none (never), force (always), or auto (when expired, default)', + ) + +const zConnectionStatus = z + .enum(['healthy', 'disconnected', 'error', 'manual']) + .describe( + 'Connection status: healthy (all well), disconnected (needs reconnection), error (system issue), manual (import connection)', + ) + +const zConnectionError = z + .enum(['refresh_failed', 'unknown_external_error']) + .describe('Error types: refresh_failed and unknown_external_error') + +const zExpandOptions = z + .enum(['connector']) + .describe('Fields to expand: connector (includes connector details)') + +async function formatConnection( + ctx: RouterContext, + connection: z.infer, + include_secrets: z.infer = 'none', + expand: z.infer[] = [], +) { + const connector = + defConnectors[connection.connector_name as keyof typeof defConnectors] + if (!connector) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `Connector not found for connection ${connection.id}`, + }) + } + + // Handle different levels of secret inclusion + let settingsToInclude = {} + if (include_secrets === 'basic' && connection.settings.oauth) { + settingsToInclude = { + settings: { + ...connection.settings, + oauth: { + credentials: connection.settings.oauth.credentials, + }, + }, + } + } else if (include_secrets === 'all') { + settingsToInclude = {settings: connection.settings} + } + + let expandedFields = {} + if (expand.includes('connector')) { + const connectorConfig = await ctx.db.query.connector_config.findFirst({ + where: eq(schema.connector_config.id, connection.connector_config_id), + }) + if (!connectorConfig) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `Connector config not found: ${connection.connector_config_id}`, + }) + } + + expandedFields = { + connector: expandConnector(connectorConfig), + } + } + + return { + ...connection, + ...settingsToInclude, + ...expandedFields, + } +} export const connectionRouter = router({ + getConnection: publicProcedure + .meta({ + openapi: {method: 'GET', path: '/connection/{id}'}, + }) + // TODO: make zId('conn') + .input( + z.object({ + id: z.string(), + include_secrets: zIncludeSecrets.optional().default('none'), + refresh_policy: zRefreshPolicy.optional().default('auto'), + expand: z.array(zExpandOptions).optional().default([]), + }), + ) + .output(core.connection) + .query(async ({ctx, input}) => { + const connection = await ctx.db.query.connection.findFirst({ + where: eq(schema.connection.id, input.id), + }) + if (!connection || !connection.connector_config_id) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: 'Connection not found', + }) + } + + const credentialsRequiresRefresh = + input.refresh_policy === 'force' || + (input.refresh_policy === 'auto' && + connection.settings.oauth?.credentials?.expires_at + ? new Date(connection.settings.oauth.credentials.expires_at) < + new Date() + : false) + + if (credentialsRequiresRefresh) { + console.warn( + 'Credentials require refresh for connection id, skipping: ' + + connection.id, + ) + } + + return formatConnection( + ctx, + // TODO: fix this any casting + connection as any as z.infer, + input.include_secrets, + input.expand, + ) + }), listConnections: publicProcedure .meta({ openapi: {method: 'GET', path: '/connection'}, }) - .input(z.void()) + .input( + zListParams + .extend({ + connector_name: z.string().optional(), + customer_id: z.string().optional(), + // TODO: make zId('ccfg').optional() + connector_config_id: z.string().optional(), + include_secrets: zIncludeSecrets.optional().default('none'), + expand: z.array(zExpandOptions).optional().default([]), + }) + .optional(), + ) + .output(zListResponse(core.connection)) + .query(async ({ctx, input}) => { + const {query, limit, offset} = applyPaginationAndOrder( + ctx.db + .select({ + connection: schema.connection, + total: count(), + }) + .from(schema.connection) + .where( + and( + input?.connector_config_id + ? eq( + schema.connection.connector_config_id, + input.connector_config_id, + ) + : undefined, + input?.customer_id + ? eq(schema.connection.customer_id, input.customer_id) + : undefined, + input?.connector_name + ? eq(schema.connection.connector_name, input.connector_name) + : undefined, + ), + ), + schema.connection.created_at, + input, + ) + + const {items, total} = await processPaginatedResponse(query, 'connection') + + return { + items: await Promise.all( + items.map((conn) => + formatConnection( + ctx, + // @ts-ignore, QQ why is connector_config_id string|null in schema? + conn, + input?.include_secrets ?? 'none', + input?.expand ?? [], + ), + ), + ), + total, + limit, + offset, + } + }), + checkConnection: publicProcedure + .meta({ + openapi: {method: 'POST', path: '/connection/{id}/check'}, + }) + .input( + z.object({ + // TODO: make zId('conn') + id: z.string(), + }), + ) .output( z.object({ - // items: z.array(core.connection), - items: z.array( - z.object({ - id: z.string(), - connector_config_id: z.string().nullable(), - }), - ), + id: z.string(), + status: zConnectionStatus, + error: zConnectionError.optional(), + errorMessage: z.string().optional(), }), ) - .query(async ({ctx}) => { - const connections = await ctx.db.query.connection.findMany({ - with: {connector_config: {}}, + .mutation(async ({ctx, input}) => { + const connection = await ctx.db.query.connection.findFirst({ + where: eq(schema.connection.id, input.id), }) - return {items: connections} + if (!connection || !connection.connector_config_id) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: 'Connection not found', + }) + } + + const credentialsRequiresRefresh = connection.settings.oauth?.credentials + ?.expires_at + ? new Date(connection.settings.oauth.credentials.expires_at) < + new Date() + : false + + if (credentialsRequiresRefresh) { + // TODO: implement refresh logic here + console.warn('Connection requires refresh', credentialsRequiresRefresh) + // Add actual refresh implementation + } + + const connector = + serverConnectors[ + connection.connector_name as keyof typeof serverConnectors + ] + if (!connector) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `Connector not found for connection ${connection.id}`, + }) + } + + if ( + 'checkConnection' in connector && + typeof connector.checkConnection === 'function' + ) { + try { + await connector.checkConnection(connection.settings as any) + // QQ: should this parse the results of checkConnection somehow? + return { + id: connection.id, + status: 'healthy', + } + } catch (error) { + return { + id: connection.id, + status: 'disconnected', + error: 'unknown_external_error', + } + } + } + + // QQ: should we return healthy by default even if there's no check connection implemented? + return { + id: connection.id, + status: 'healthy', + } }), }) diff --git a/packages-v1/api-v1/trpc/routers/connectorConfig.ts b/packages-v1/api-v1/trpc/routers/connectorConfig.ts index 6e6717bdf..fabe6d29f 100644 --- a/packages-v1/api-v1/trpc/routers/connectorConfig.ts +++ b/packages-v1/api-v1/trpc/routers/connectorConfig.ts @@ -1,23 +1,67 @@ import {z} from 'zod' +import {and, count, eq, schema} from '@openint/db' import {authenticatedProcedure, router} from '../_base' - -// import {core} from '../../models' +import {core} from '../../models' +import {expandConnector, zExpandOptions} from '../utils/connectorUtils' +import { + applyPaginationAndOrder, + processPaginatedResponse, + zListParams, + zListResponse, +} from '../utils/pagination' export const connectorConfigRouter = router({ listConnectorConfigs: authenticatedProcedure .meta({ openapi: {method: 'GET', path: '/connector-config'}, }) - .input(z.void()) - .output( - z.object({ - // TODO: Fix me by consolidating db and db-v1 finally - items: z.array(z.object({id: z.string(), org_id: z.string()})), - }), + .input( + zListParams + .extend({ + expand: z.array(zExpandOptions).optional().default([]), + connector_name: z.string().optional(), + }) + .optional(), ) - .query(async ({ctx}) => { - const connectorConfigs = await ctx.db.query.connector_config.findMany({}) - console.log('connectorConfigs', connectorConfigs) - return {items: connectorConfigs} + .output(zListResponse(core.connector_config)) + .query(async ({ctx, input}) => { + const {query, limit, offset} = applyPaginationAndOrder( + ctx.db + .select({ + connector_config: schema.connector_config, + total: count(), + }) + .from(schema.connector_config) + .where( + and( + input?.connector_name + ? eq( + schema.connector_config.connector_name, + input.connector_name, + ) + : undefined, + ), + ), + schema.connector_config.created_at, + input, + ) + + const {items, total} = await processPaginatedResponse( + query, + 'connector_config', + ) + + return { + items: await Promise.all( + items.map(async (ccfg) => + input?.expand.includes('connector') + ? {...ccfg, connector: await expandConnector(ccfg)} + : ccfg, + ), + ), + total, + limit, + offset, + } }), }) diff --git a/packages-v1/api-v1/trpc/routers/event.ts b/packages-v1/api-v1/trpc/routers/event.ts index a72567885..f127ac5bb 100644 --- a/packages-v1/api-v1/trpc/routers/event.ts +++ b/packages-v1/api-v1/trpc/routers/event.ts @@ -1,34 +1,53 @@ -import {z} from 'zod' -import {schema} from '@openint/db' +import {count, schema} from '@openint/db' import {publicProcedure, router} from '../_base' import {core} from '../../models' +import { + applyPaginationAndOrder, + processPaginatedResponse, + zListParams, + zListResponse, +} from '../utils/pagination' export const eventRouter = router({ - createEvent: publicProcedure - .meta({ - openapi: {method: 'POST', path: '/event'}, - }) - .input(core.event_insert) // Ref does not work for input params for now in zod-openapi. So will be inlined in the spec unfortunately - .output(core.event) - .mutation(async ({ctx, input}) => { - const [event] = await ctx.db - .insert(schema.event) - .values(input) - .returning() - return event! - }), + // NOTE: why publish this API? + // createEvent: publicProcedure + // .meta({ + // openapi: {method: 'POST', path: '/event'}, + // }) + // .input(core.event_insert) // Ref does not work for input params for now in zod-openapi. So will be inlined in the spec unfortunately + // .output(core.event) + // .mutation(async ({ctx, input}) => { + // const [event] = await ctx.db + // .insert(schema.event) + // .values(input) + // .returning() + // return event! + // }), listEvents: publicProcedure .meta({ openapi: {method: 'GET', path: '/event'}, }) - .input(z.void()) - .output( - z.object({ - items: z.array(core.event), - }), - ) - .query(async ({ctx}) => { - const events = await ctx.db.query.event.findMany({}) - return {items: events} + .input(zListParams.optional()) + .output(zListResponse(core.event)) + .query(async ({ctx, input}) => { + const {query, limit, offset} = applyPaginationAndOrder( + ctx.db + .select({ + event: schema.event, + total: count(), + }) + .from(schema.event), + schema.event.timestamp, + input, + ) + + const {items, total} = await processPaginatedResponse(query, 'event') + + return { + items, + total, + limit, + offset, + } }), }) diff --git a/packages-v1/api-v1/trpc/utils/connectorUtils.ts b/packages-v1/api-v1/trpc/utils/connectorUtils.ts new file mode 100644 index 000000000..7fb66a171 --- /dev/null +++ b/packages-v1/api-v1/trpc/utils/connectorUtils.ts @@ -0,0 +1,44 @@ +import {TRPCError} from '@trpc/server' +import {z} from 'zod' +import {defConnectors} from '@openint/all-connectors/connectors.def' +import {core} from '../../models' + +export const zExpandOptions = z + .enum(['connector']) + .describe('Fields to expand: connector (includes connector details)') + +export async function expandConnector( + connectorConfig: z.infer, +) { + const connectorName = connectorConfig.connector_name + + const connector = defConnectors[connectorName as keyof typeof defConnectors] + if (!connector) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `Connector not found: ${connectorName}`, + }) + } + + const logoUrl = + connector.metadata && + 'logoUrl' in connector.metadata && + connector.metadata.logoUrl?.startsWith('http') + ? connector.metadata.logoUrl + : connector.metadata && 'logoUrl' in connector.metadata + ? // TODO: replace this with our own custom domain + `https://cdn.jsdelivr.net/gh/openintegrations/openint@main/apps/web/public/${connector.metadata.logoUrl}` + : undefined + + return { + // TODO: add more fields? + connector_name: connectorName, + // TODO: add display_name? + // display_name: connectorConfig.display_name, + // TODO: add enabled? + // enabled: connectorConfig.enabled, + created_at: connectorConfig.created_at, + updated_at: connectorConfig.updated_at, + logo_url: logoUrl, + } +} diff --git a/packages-v1/api-v1/trpc/utils/pagination.ts b/packages-v1/api-v1/trpc/utils/pagination.ts new file mode 100644 index 000000000..517445133 --- /dev/null +++ b/packages-v1/api-v1/trpc/utils/pagination.ts @@ -0,0 +1,58 @@ +import {z} from 'zod' +import {Column, desc, schema} from '@openint/db' + +export const zListParams = z.object({ + limit: z.number().optional(), + offset: z.number().optional(), +}) + +export function zListResponse(itemSchema: T) { + return z.object({ + items: z.array(itemSchema), + total: z.number(), + limit: z.number(), + offset: z.number(), + }) +} +export function applyPaginationAndOrder< + T extends {orderBy: Function; limit: Function; offset: Function}, + P extends {limit?: number; offset?: number} | undefined, +>( + query: T, + orderByColumn: Column = schema.connection.created_at, + params?: P, + orderDirection: 'asc' | 'desc' = 'desc', +): {query: T; limit: number; offset: number} { + // Process pagination parameters + const limit = params?.limit ?? 50 + const offset = params?.offset ?? 0 + + // Apply ordering + let modifiedQuery = query.orderBy( + orderDirection === 'desc' ? desc(orderByColumn) : orderByColumn, + ) as T + + // Apply pagination + modifiedQuery = modifiedQuery.limit(limit).offset(offset) as T + + return {query: modifiedQuery, limit, offset} +} + +export async function processPaginatedResponse( + query: any, + entityKey: T, +): Promise<{ + items: Array<(typeof schema)[T] extends {$inferSelect: infer U} ? U : never> + total: number +}> { + // note in future we can add db specific error handling here + const result = await query + const total = result.length > 0 ? Number(result[0]?.total ?? 0) : 0 + + const items = result.map((r: any) => r[entityKey]) + + return { + items, + total, + } +} diff --git a/packages/db/index.ts b/packages/db/index.ts index 57521c113..25f506061 100644 --- a/packages/db/index.ts +++ b/packages/db/index.ts @@ -3,6 +3,7 @@ import {type Database} from './db' import * as schema from './schema/schema' export * from 'drizzle-orm' +export type {QueryBuilder} from 'drizzle-orm/pg-core' export * from './lib/stripeNullByte' export * from './lib/upsert' export {schema} diff --git a/packages/db/schema/schema.ts b/packages/db/schema/schema.ts index ce1c665fe..ce8816546 100644 --- a/packages/db/schema/schema.ts +++ b/packages/db/schema/schema.ts @@ -20,6 +20,7 @@ export const userRole = pgRole('authenticated') // TODO: Missing jwt_org_id() function definition export const connection = pgTable( + // NOTE: shouldn't this have a status or should it calculated based on settings? 'connection', { id: varchar()