Skip to content

feat(v1): API V1 methods #288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 272 additions & 13 deletions packages-v1/api-v1/trpc/routers/connection.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,286 @@
import {TRPCError} from '@trpc/server'
import {z} from 'zod'
import {publicProcedure, router} from '../_base'
import {defConnectors} from '@openint/all-connectors/connectors.def'
import {serverConnectors} from '@openint/all-connectors/connectors.server'
import {and, count, eq, schema} from '@openint/db'
import {publicProcedure, router, RouterContext} from '../_base'
import {core} from '../../models'
import {expandConnector} from '../utils/connectorUtils'
import {
applyPaginationAndOrder,
processPaginatedResponse,
zListParams,
zListResponse,
} from './index'

const zIncludeSecrets = z
.enum(['none', 'basic', 'all'])
.describe(
'Controls secret inclusion: none (default), basic (auth only), or all secrets',
)
const zRefreshPolicy = z
.enum(['none', 'force', 'auto'])
.describe(
'Controls credential refresh: none (never), force (always), or auto (when expired, default)',
)

const zConnectionStatus = z
.enum(['healthy', 'disconnected', 'error', 'manual'])
.describe(
'Connection status: healthy (all well), disconnected (needs reconnection), error (system issue), manual (import connection)',
)

const zConnectionError = z
.enum(['refresh_failed', 'unknown_external_error'])
.describe('Error types: refresh_failed and unknown_external_error')

const zExpandOptions = z
.enum(['connector'])
.describe('Fields to expand: connector (includes connector details)')

async function formatConnection(
ctx: RouterContext,
connection: z.infer<typeof core.connection>,
include_secrets: z.infer<typeof zIncludeSecrets> = 'none',
expand: z.infer<typeof zExpandOptions>[] = [],
) {
const connector =
defConnectors[connection.connector_name as keyof typeof defConnectors]
if (!connector) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Connector not found for connection ${connection.id}`,
})
}

// Handle different levels of secret inclusion
let settingsToInclude = {}
if (include_secrets === 'basic' && connection.settings.oauth) {
settingsToInclude = {
settings: {
...connection.settings,
oauth: {
credentials: connection.settings.oauth.credentials,
},
},
}
} else if (include_secrets === 'all') {
settingsToInclude = {settings: connection.settings}
}

let expandedFields = {}
if (expand.includes('connector')) {
const connectorConfig = await ctx.db.query.connector_config.findFirst({
where: eq(schema.connector_config.id, connection.connector_config_id),
})
if (!connectorConfig) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Connector config not found: ${connection.connector_config_id}`,
})
}

expandedFields = {
connector: expandConnector(connectorConfig),
}
}

return {
...connection,
...settingsToInclude,
...expandedFields,
}
}

export const connectionRouter = router({
getConnection: publicProcedure
.meta({
openapi: {method: 'GET', path: '/connection/{id}'},
})
// TODO: make zId('conn')
.input(
z.object({
id: z.string(),
include_secrets: zIncludeSecrets.optional().default('none'),
refresh_policy: zRefreshPolicy.optional().default('auto'),
expand: z.array(zExpandOptions).optional().default([]),
}),
)
.output(core.connection)
.query(async ({ctx, input}) => {
const connection = await ctx.db.query.connection.findFirst({
where: eq(schema.connection.id, input.id),
})
if (!connection || !connection.connector_config_id) {
throw new TRPCError({
code: 'NOT_FOUND',
message: 'Connection not found',
})
}

const credentialsRequiresRefresh =
input.refresh_policy === 'force' ||
(input.refresh_policy === 'auto' &&
connection.settings.oauth?.credentials?.expires_at
? new Date(connection.settings.oauth.credentials.expires_at) <
new Date()
: false)

if (credentialsRequiresRefresh) {
console.warn(
'Credentials require refresh for connection id, skipping: ' +
connection.id,
)
}

return formatConnection(
ctx,
// TODO: fix this any casting
connection as any as z.infer<typeof core.connection>,
input.include_secrets,
input.expand,
)
}),
listConnections: publicProcedure
.meta({
openapi: {method: 'GET', path: '/connection'},
})
.input(z.void())
.input(
zListParams
.extend({
connector_name: z.string().optional(),
customer_id: z.string().optional(),
// TODO: make zId('ccfg').optional()
connector_config_id: z.string().optional(),
include_secrets: zIncludeSecrets.optional().default('none'),
expand: z.array(zExpandOptions).optional().default([]),
})
.optional(),
)
.output(zListResponse(core.connection))
.query(async ({ctx, input}) => {
const {query, limit, offset} = applyPaginationAndOrder(
ctx.db
.select({
connection: schema.connection,
total: count(),
})
.from(schema.connection)
.where(
and(
input?.connector_config_id
? eq(
schema.connection.connector_config_id,
input.connector_config_id,
)
: undefined,
input?.customer_id
? eq(schema.connection.customer_id, input.customer_id)
: undefined,
input?.connector_name
? eq(schema.connection.connector_name, input.connector_name)
: undefined,
),
),
schema.connection.created_at,
input,
)

const {items, total} = await processPaginatedResponse(query, 'connection')

return {
items: await Promise.all(
items.map((conn) =>
formatConnection(
ctx,
// @ts-ignore, QQ why is connector_config_id string|null in schema?
conn,
input?.include_secrets ?? 'none',
input?.expand ?? [],
),
),
),
total,
limit,
offset,
}
}),
checkConnection: publicProcedure
.meta({
openapi: {method: 'POST', path: '/connection/{id}/check'},
})
.input(
z.object({
// TODO: make zId('conn')
id: z.string(),
}),
)
.output(
z.object({
// items: z.array(core.connection),
items: z.array(
z.object({
id: z.string(),
connector_config_id: z.string().nullable(),
}),
),
id: z.string(),
status: zConnectionStatus,
error: zConnectionError.optional(),
errorMessage: z.string().optional(),
}),
)
.query(async ({ctx}) => {
const connections = await ctx.db.query.connection.findMany({
with: {connector_config: {}},
.mutation(async ({ctx, input}) => {
const connection = await ctx.db.query.connection.findFirst({
where: eq(schema.connection.id, input.id),
})
return {items: connections}
if (!connection || !connection.connector_config_id) {
throw new TRPCError({
code: 'NOT_FOUND',
message: 'Connection not found',
})
}

const credentialsRequiresRefresh = connection.settings.oauth?.credentials
?.expires_at
? new Date(connection.settings.oauth.credentials.expires_at) <
new Date()
: false

if (credentialsRequiresRefresh) {
// TODO: implement refresh logic here
console.warn('Connection requires refresh', credentialsRequiresRefresh)
// Add actual refresh implementation
}

const connector =
serverConnectors[
connection.connector_name as keyof typeof serverConnectors
]
if (!connector) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Connector not found for connection ${connection.id}`,
})
}

if (
'checkConnection' in connector &&
typeof connector.checkConnection === 'function'
) {
try {
await connector.checkConnection(connection.settings as any)
// QQ: should this parse the results of checkConnection somehow?
return {
id: connection.id,
status: 'healthy',
}
} catch (error) {
return {
id: connection.id,
status: 'disconnected',
error: 'unknown_external_error',
}
}
}

// QQ: should we return healthy by default even if there's no check connection implemented?
return {
id: connection.id,
status: 'healthy',
}
}),
})
68 changes: 56 additions & 12 deletions packages-v1/api-v1/trpc/routers/connectorConfig.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,67 @@
import {z} from 'zod'
import {and, count, eq, schema} from '@openint/db'
import {
applyPaginationAndOrder,
processPaginatedResponse,
zListParams,
zListResponse,
} from '.'
import {authenticatedProcedure, router} from '../_base'

// import {core} from '../../models'
import {core} from '../../models'
import {expandConnector, zExpandOptions} from '../utils/connectorUtils'

export const connectorConfigRouter = router({
listConnectorConfigs: authenticatedProcedure
.meta({
openapi: {method: 'GET', path: '/connector-config'},
})
.input(z.void())
.output(
z.object({
// TODO: Fix me by consolidating db and db-v1 finally
items: z.array(z.object({id: z.string(), org_id: z.string()})),
}),
.input(
zListParams
.extend({
expand: z.array(zExpandOptions).optional().default([]),
connector_name: z.string().optional(),
})
.optional(),
)
.query(async ({ctx}) => {
const connectorConfigs = await ctx.db.query.connector_config.findMany({})
console.log('connectorConfigs', connectorConfigs)
return {items: connectorConfigs}
.output(zListResponse(core.connector_config))
.query(async ({ctx, input}) => {
const {query, limit, offset} = applyPaginationAndOrder(
ctx.db
.select({
connector_config: schema.connector_config,
total: count(),
})
.from(schema.connector_config)
.where(
and(
input?.connector_name
? eq(
schema.connector_config.connector_name,
input.connector_name,
)
: undefined,
),
),
schema.connector_config.created_at,
input,
)

const {items, total} = await processPaginatedResponse(
query,
'connector_config',
)

return {
items: await Promise.all(
items.map(async (ccfg) =>
input?.expand.includes('connector')
? {...ccfg, connector: await expandConnector(ccfg)}
: ccfg,
),
),
total,
limit,
offset,
}
}),
})
Loading
Loading