Skip to content

Connection validation and making expires_at optional #244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions kits/cdk/internal/oauthConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ export const zOauthConnectionError = z.object({
message: z.string().nullish(),
})

const zOauthCredentials = z.object({
type: zAuthMode,
/** For API key auth... */
api_key: z.string().nullish(),
access_token: z.string().optional(),
refresh_token: z.string().optional(),
// sometimes this is missing from the response
expires_at: z.string().datetime().optional(),
raw: z.object({
access_token: z.string(),
// sometimes this is missing from the response
expires_in: z.number().optional(),
expires_at: z.string().datetime().optional(),
/** Refresh token (Only returned if the REFRESH_TOKEN boolean parameter is set to true and the refresh token is available) */
refresh_token: z.string().nullish(),
refresh_token_expires_in: z.number().nullish(),
token_type: z.string(), //'bearer',
scope: z.string().optional(),
}),
});
export const oauthBaseSchema = {
name: z.literal('__oauth__'), // TODO: This is a noop
connectorConfig: z.object({
Expand All @@ -40,24 +60,7 @@ export const oauthBaseSchema = {
connectionSettings: z.object({
// equivalent to nango /v1/connections data.connection object with certain fields removed like id
oauth: z.object({
credentials: z.object({
type: zAuthMode,
/** For API key auth... */
api_key: z.string().nullish(),
access_token: z.string().optional(),
refresh_token: z.string().optional(),
expires_at: z.string().datetime(),
raw: z.object({
access_token: z.string(),
expires_in: z.number(),
expires_at: z.string().datetime(),
/** Refresh token (Only returned if the REFRESH_TOKEN boolean parameter is set to true and the refresh token is available) */
refresh_token: z.string().nullish(),
refresh_token_expires_in: z.number().nullish(),
token_type: z.string(), //'bearer',
scope: z.string().optional(),
}),
}),
credentials: zOauthCredentials,
connection_config: z
.object({
portalId: z.number().nullish(),
Expand Down Expand Up @@ -156,6 +159,17 @@ export function makeOauthConnectorServer({
},
})
.then((r) => r.data as OauthBaseTypes['connectionSettings'])


const parsed = zOauthCredentials.safeParse(res)
if (!parsed.success) {
console.error(
'Provider did not return valid connection settings',
parsed?.error?.format(),
)
throw new Error('Provider did not return valid connection settings')
}

return {
connectionExternalId: extractId(connId)[2],
settings: {
Expand Down
3 changes: 1 addition & 2 deletions packages/api/proxyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ export const proxyHandler = async (req: Request) => {
const protectedContext = getProtectedContext(ctx)
const remoteContext = await getRemoteContext(protectedContext)

const credentialsExpired = remoteContext.remote.settings.oauth?.credentials
.expires_at
const credentialsExpired = remoteContext.remote.settings.oauth?.credentials?.expires_at
? new Date(remoteContext.remote.settings.oauth.credentials.expires_at) <
new Date()
: false
Expand Down
4 changes: 2 additions & 2 deletions packages/engine-backend/router/connectionRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ export const connectionRouter = trpc.router({
.output(z.object({}))
.mutation(async ({input: {id: connId, ...opts}, ctx}) => {
if (ctx.viewer.role === 'customer') {
await ctx.services.getConnectionOrFail(connId)
await ctx.services.getConnectionOrFail(connId, true)
}
const conn = await ctx.asOrgIfNeeded.getConnectionExpandedOrFail(connId)
const conn = await ctx.asOrgIfNeeded.getConnectionExpandedOrFail(connId, true)
const {settings, connectorConfig: ccfg} = conn
if (!opts?.skipRevoke) {
await ccfg.connector.revokeConnection?.(
Expand Down
63 changes: 37 additions & 26 deletions packages/engine-backend/services/dbService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,18 @@ export function makeDBService({
return getOrFail(tableName, id)
}

const getConnectorConfigInfoOrFail = (id: Id['ccfg']) =>
const getConnectorConfigInfoOrFail = (id: Id['ccfg'], skipValidation = false) =>
metaService.listConnectorConfigInfos({id}).then((ints) => {
if (!ints[0]) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `ccfg info not found: ${id}`,
})
}
return ints[0]
return skipValidation ? ints[0] : zRaw.connector_config.parse(ints[0])
})

const getConnectorConfigOrFail = (id: Id['ccfg']) =>
const getConnectorConfigOrFail = (id: Id['ccfg'], skipValidation = false) =>
metaService.tables.connector_config.get(id).then((_ccfg) => {
if (!_ccfg) {
throw new TRPCError({
Expand All @@ -163,11 +163,13 @@ export function makeDBService({
}
const int = zRaw.connector_config.parse(_ccfg)
const connector = getConnectorOrFail(int.id)
const config: {} = connector.schemas.connectorConfig?.parse(int.config)
const config: {} = skipValidation
? int.config
: connector.schemas.connectorConfig?.parse(int.config)
return {...int, connector, config}
})

const getIntegrationOrFail = (id: Id['int']) =>
const getIntegrationOrFail = (id: Id['int'], skipValidation = false) =>
metaService.tables.integration.get(id).then(async (ins) => {
if (!ins) {
throw new TRPCError({
Expand All @@ -182,49 +184,54 @@ export function makeDBService({
ins.standard = provider?.standardMappers?.integration?.(ins.external)
await metaLinks.patch('integration', ins.id, {standard: ins.standard})
}
return zRaw.integration.parse(ins)
return skipValidation ? ins : zRaw.integration.parse(ins)
})
const getConnectionOrFail = (id: Id['conn']) =>
const getConnectionOrFail = (id: Id['conn'], skipValidation = false) =>
metaService.tables.connection.get(id).then((conn) => {
if (!conn) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `conn not found: ${id}`,
})
}
return zRaw.connection.parse(conn)
if (!skipValidation) {
return zRaw.connection.parse(conn)
}
return conn
})
const getPipelineOrFail = (id: Id['pipe']) =>
const getPipelineOrFail = (id: Id['pipe'], skipValidation = false) =>
metaService.tables.pipeline.get(id).then((pipe) => {
if (!pipe) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `pipe not found: ${id}`,
})
}
return zRaw.pipeline.parse(pipe)
return skipValidation ? pipe : zRaw.pipeline.parse(pipe)
})

const getConnectionExpandedOrFail = (id: Id['conn']) =>
const getConnectionExpandedOrFail = (id: Id['conn'], skipValidation = false) =>
getConnectionOrFail(id).then(async (conn) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getConnectionExpandedOrFail function should pass the skipValidation parameter to getConnectionOrFail to ensure consistent behavior.

Suggested change
getConnectionOrFail(id).then(async (conn) => {
getConnectionOrFail(id, skipValidation)

const connectorConfig = await getConnectorConfigOrFail(
conn.connectorConfigId,
skipValidation,
)
const settings: {} =
connectorConfig.connector.schemas.connectionSettings?.parse(
conn.settings,
)
const settings: {} = skipValidation
? conn.settings
: connectorConfig.connector.schemas.connectionSettings?.parse(
conn.settings,
)
const integration = conn.integrationId
? await getIntegrationOrFail(conn.integrationId)
? await getIntegrationOrFail(conn.integrationId, skipValidation)
: undefined
return {...conn, connectorConfig, settings, integration}
})

const getPipelineExpandedOrFail = (id: Id['pipe']) =>
const getPipelineExpandedOrFail = (id: Id['pipe'], skipValidation = false) =>
getPipelineOrFail(id).then(async (pipe) => {
const [source, destination] = await Promise.all([
getConnectionExpandedOrFail(pipe.sourceId!),
getConnectionExpandedOrFail(pipe.destinationId!),
getConnectionExpandedOrFail(pipe.sourceId!, skipValidation),
getConnectionExpandedOrFail(pipe.destinationId!, skipValidation),
])
// if (
// pipe.sourceState != null &&
Expand All @@ -244,13 +251,17 @@ export function makeDBService({
// message: `destinationState is not supported for ${destination.connectorConfig.connector.name}`,
// })
// }
const sourceState: {} = (
source.connectorConfig.connector.schemas.sourceState ?? z.unknown()
).parse(pipe.sourceState)
const destinationState: {} = (
destination.connectorConfig.connector.schemas.destinationState ??
z.unknown()
).parse(pipe.destinationState)
const sourceState: {} = skipValidation
? pipe.sourceState
: (
source.connectorConfig.connector.schemas.sourceState ?? z.unknown()
).parse(pipe.sourceState)
const destinationState: {} = skipValidation
? pipe.destinationState
: (
destination.connectorConfig.connector.schemas.destinationState ??
z.unknown()
).parse(pipe.destinationState)
// const links = R.pipe(
// rest.linkOptions ?? pipeline?.linkOptions ?? [],
// R.map((l) =>
Expand Down