diff --git a/.github/workflows/validate-workflow.yml b/.github/workflows/validate-workflow.yml index 648bede83..1e0d55353 100644 --- a/.github/workflows/validate-workflow.yml +++ b/.github/workflows/validate-workflow.yml @@ -17,9 +17,9 @@ jobs: env: # `POSTGRES_HOST` is `postgres` # optional (defaults to `postgres`) - POSTGRES_DB: test + POSTGRES_DB: postgres # required - POSTGRES_PASSWORD: test + POSTGRES_PASSWORD: password # optional (defaults to `5432`) POSTGRES_PORT: 5432 # optional (defaults to `postgres`) @@ -41,14 +41,14 @@ jobs: neon-proxy: image: ghcr.io/timowilhelm/local-neon-http-proxy:main env: - PG_CONNECTION_STRING: postgres://postgres:test@db.localtest.me:5432/test + PG_CONNECTION_STRING: postgres://postgres:password@postgres:5432/postgres ports: - '4444:4444' env: NODE_OPTIONS: --max-old-space-size=8192 VERCEL_TOKEN: ${{ secrets.VERCEL_TOKEN }} - DATABASE_URL: postgres://postgres:test@db.localtest.me:5432/test + DATABASE_URL: postgres://postgres:password@db.localtest.me:5432/postgres steps: - name: Checkout @@ -133,9 +133,6 @@ jobs: npx inngest-cli@latest dev & pnpm wait-on tcp:8288 - - name: Setup upterm session - uses: lhotari/action-upterm@v1 - - name: Run tests run: ./bin/shdotenv -q -e ./apps/web/.env.local pnpm run test::ci diff --git a/apps/web/__tests__/end-to-end.spec.ts b/apps/web/__tests__/end-to-end.spec.ts index 99ecb44c1..76fc35028 100644 --- a/apps/web/__tests__/end-to-end.spec.ts +++ b/apps/web/__tests__/end-to-end.spec.ts @@ -7,7 +7,7 @@ import {env, testEnv, testEnvRequired} from '@openint/env' import {initOpenIntSDK} from '@openint/sdk' import {setupTestOrg, tearDownTestOrg} from './test-utils' -jest.setTimeout(30 * 1000) // long timeout because we have to wait for next.js to compile +jest.setTimeout(60 * 1000) // long timeout because we have to wait for next.js to compile let fixture: Awaited> let sdk: ReturnType diff --git a/packages-v1/api-v1/__tests__/auth.spec.ts b/packages-v1/api-v1/__tests__/auth.spec.ts index 538784324..97e6fcc11 100644 --- a/packages-v1/api-v1/__tests__/auth.spec.ts +++ b/packages-v1/api-v1/__tests__/auth.spec.ts @@ -23,29 +23,33 @@ test.each(Object.entries(viewers))( }, ) -describeEachDatabase({drivers: ['pg']}, (db) => { - test('anon user has no access to connector_config', async () => { - await expect( - trpcClientForViewer(null).listConnectorConfigs.query(), - ).rejects.toThrow('Admin only') - }) +describeEachDatabase( + {drivers: ['pg'], migrate: true, enableExtensions: true}, + (db) => { + test('anon user has no access to connector_config', async () => { + await expect( + trpcClientForViewer(null).listConnectorConfigs.query(), + ).rejects.toThrow('Admin only') + }) - beforeAll(async () => { - await db - .insert(schema.connector_config) - .values({org_id: 'org_123', id: 'ccfg_123'}) - }) + beforeAll(async () => { + await db.$truncateAll() + await db + .insert(schema.connector_config) + .values({org_id: 'org_123', id: 'ccfg_123'}) + }) - test('org has access to its own connector config', async () => { - const client = trpcClientForViewer({role: 'org', orgId: 'org_123'}) - const res = await client.listConnectorConfigs.query() - expect(res.items).toHaveLength(1) - expect(res.items[0]?.id).toEqual('ccfg_123') - }) + test('org has access to its own connector config', async () => { + const client = trpcClientForViewer({role: 'org', orgId: 'org_123'}) + const res = await client.listConnectorConfigs.query() + expect(res.items).toHaveLength(1) + expect(res.items[0]?.id).toEqual('ccfg_123') + }) - test('org has no access to other orgs connector config', async () => { - const client = trpcClientForViewer({role: 'org', orgId: 'org_456'}) - const res = await client.listConnectorConfigs.query() - expect(res.items).toHaveLength(0) - }) -}) + test('org has no access to other orgs connector config', async () => { + const client = trpcClientForViewer({role: 'org', orgId: 'org_456'}) + const res = await client.listConnectorConfigs.query() + expect(res.items).toHaveLength(0) + }) + }, +) diff --git a/packages/db/__generated__/schema.sql b/packages/db/__generated__/schema.sql index f2f7e6fe9..caf9ce12a 100644 --- a/packages/db/__generated__/schema.sql +++ b/packages/db/__generated__/schema.sql @@ -2,12 +2,13 @@ -- PostgreSQL database dump -- --- Dumped from database version 15.10 --- Dumped by pg_dump version 15.10 (Homebrew) +-- Dumped from database version 17.2 (Debian 17.2-1.pgdg120+1) +-- Dumped by pg_dump version 17.2 (Homebrew) SET statement_timeout = 0; SET lock_timeout = 0; SET idle_in_transaction_session_timeout = 0; +SET transaction_timeout = 0; SET client_encoding = 'UTF8'; SET standard_conforming_strings = on; SELECT pg_catalog.set_config('search_path', '', false); @@ -36,7 +37,61 @@ COMMENT ON SCHEMA public IS 'standard public schema'; CREATE FUNCTION public.generate_ulid() RETURNS text LANGUAGE plpgsql - AS $$ DECLARE encoding BYTEA = '0123456789ABCDEFGHJKMNPQRSTVWXYZ'; timestamp BYTEA = E'\\000\\000\\000\\000\\000\\000'; output TEXT = ''; unix_time BIGINT; ulid BYTEA; BEGIN unix_time = (EXTRACT(EPOCH FROM NOW()) * 1000)::BIGINT; timestamp = SET_BYTE(timestamp, 0, (unix_time >> 40)::BIT(8)::INTEGER); timestamp = SET_BYTE(timestamp, 1, (unix_time >> 32)::BIT(8)::INTEGER); timestamp = SET_BYTE(timestamp, 2, (unix_time >> 24)::BIT(8)::INTEGER); timestamp = SET_BYTE(timestamp, 3, (unix_time >> 16)::BIT(8)::INTEGER); timestamp = SET_BYTE(timestamp, 4, (unix_time >> 8)::BIT(8)::INTEGER); timestamp = SET_BYTE(timestamp, 5, unix_time::BIT(8)::INTEGER); ulid = timestamp || gen_random_bytes(10); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 224) >> 5)); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 31))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 1) & 248) >> 3)); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 1) & 7) << 2) | ((GET_BYTE(ulid, 2) & 192) >> 6))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 2) & 62) >> 1)); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 2) & 1) << 4) | ((GET_BYTE(ulid, 3) & 240) >> 4))); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 3) & 15) << 1) | ((GET_BYTE(ulid, 4) & 128) >> 7))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 4) & 124) >> 2)); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 4) & 3) << 3) | ((GET_BYTE(ulid, 5) & 224) >> 5))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 5) & 31))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 6) & 248) >> 3)); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 6) & 7) << 2) | ((GET_BYTE(ulid, 7) & 192) >> 6))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 7) & 62) >> 1)); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 7) & 1) << 4) | ((GET_BYTE(ulid, 8) & 240) >> 4))); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 8) & 15) << 1) | ((GET_BYTE(ulid, 9) & 128) >> 7))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 9) & 124) >> 2)); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 9) & 3) << 3) | ((GET_BYTE(ulid, 10) & 224) >> 5))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 10) & 31))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 11) & 248) >> 3)); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 11) & 7) << 2) | ((GET_BYTE(ulid, 12) & 192) >> 6))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 12) & 62) >> 1)); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 12) & 1) << 4) | ((GET_BYTE(ulid, 13) & 240) >> 4))); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 13) & 15) << 1) | ((GET_BYTE(ulid, 14) & 128) >> 7))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 14) & 124) >> 2)); output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 14) & 3) << 3) | ((GET_BYTE(ulid, 15) & 224) >> 5))); output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 15) & 31))); RETURN output; END $$; + AS $$ +DECLARE + -- Crockford's Base32 + encoding BYTEA = '0123456789ABCDEFGHJKMNPQRSTVWXYZ'; + timestamp BYTEA = E'\\000\\000\\000\\000\\000\\000'; + output TEXT = ''; + + unix_time BIGINT; + ulid BYTEA; +BEGIN + -- 6 timestamp bytes + unix_time = (EXTRACT(EPOCH FROM NOW()) * 1000)::BIGINT; + timestamp = SET_BYTE(timestamp, 0, (unix_time >> 40)::BIT(8)::INTEGER); + timestamp = SET_BYTE(timestamp, 1, (unix_time >> 32)::BIT(8)::INTEGER); + timestamp = SET_BYTE(timestamp, 2, (unix_time >> 24)::BIT(8)::INTEGER); + timestamp = SET_BYTE(timestamp, 3, (unix_time >> 16)::BIT(8)::INTEGER); + timestamp = SET_BYTE(timestamp, 4, (unix_time >> 8)::BIT(8)::INTEGER); + timestamp = SET_BYTE(timestamp, 5, unix_time::BIT(8)::INTEGER); + + -- 10 entropy bytes + ulid = timestamp || gen_random_bytes(10); + + -- Encode the timestamp + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 224) >> 5)); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 31))); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 1) & 248) >> 3)); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 1) & 7) << 2) | ((GET_BYTE(ulid, 2) & 192) >> 6))); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 2) & 62) >> 1)); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 2) & 1) << 4) | ((GET_BYTE(ulid, 3) & 240) >> 4))); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 3) & 15) << 1) | ((GET_BYTE(ulid, 4) & 128) >> 7))); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 4) & 124) >> 2)); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 4) & 3) << 3) | ((GET_BYTE(ulid, 5) & 224) >> 5))); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 5) & 31))); + + -- Encode the entropy + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 6) & 248) >> 3)); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 6) & 7) << 2) | ((GET_BYTE(ulid, 7) & 192) >> 6))); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 7) & 62) >> 1)); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 7) & 1) << 4) | ((GET_BYTE(ulid, 8) & 240) >> 4))); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 8) & 15) << 1) | ((GET_BYTE(ulid, 9) & 128) >> 7))); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 9) & 124) >> 2)); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 9) & 3) << 3) | ((GET_BYTE(ulid, 10) & 224) >> 5))); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 10) & 31))); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 11) & 248) >> 3)); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 11) & 7) << 2) | ((GET_BYTE(ulid, 12) & 192) >> 6))); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 12) & 62) >> 1)); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 12) & 1) << 4) | ((GET_BYTE(ulid, 13) & 240) >> 4))); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 13) & 15) << 1) | ((GET_BYTE(ulid, 14) & 128) >> 7))); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 14) & 124) >> 2)); + output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 14) & 3) << 3) | ((GET_BYTE(ulid, 15) & 224) >> 5))); + output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 15) & 31))); + + RETURN output; +END +$$; -- @@ -45,7 +100,12 @@ CREATE FUNCTION public.generate_ulid() RETURNS text CREATE FUNCTION public.jwt_customer_id() RETURNS character varying LANGUAGE sql STABLE - AS $$ select coalesce( nullif(current_setting('request.jwt.claim.customer_id', true), ''), (nullif(current_setting('request.jwt.claims', true), '')::jsonb ->> 'customer_id') ) $$; + AS $$ + select coalesce( + nullif(current_setting('request.jwt.claim.customer_id', true), ''), + (nullif(current_setting('request.jwt.claims', true), '')::jsonb ->> 'customer_id') + ) +$$; -- @@ -54,7 +114,12 @@ CREATE FUNCTION public.jwt_customer_id() RETURNS character varying CREATE FUNCTION public.jwt_org_id() RETURNS character varying LANGUAGE sql STABLE - AS $$ select coalesce( nullif(current_setting('request.jwt.claim.org_id', true), ''), (nullif(current_setting('request.jwt.claims', true), '')::jsonb ->> 'org_id') ) $$; + AS $$ + select coalesce( + nullif(current_setting('request.jwt.claim.org_id', true), ''), + (nullif(current_setting('request.jwt.claims', true), '')::jsonb ->> 'org_id') + ) +$$; -- @@ -63,7 +128,12 @@ CREATE FUNCTION public.jwt_org_id() RETURNS character varying CREATE FUNCTION public.jwt_sub() RETURNS character varying LANGUAGE sql STABLE - AS $$ select coalesce( nullif(current_setting('request.jwt.claim.sub', true), ''), (nullif(current_setting('request.jwt.claims', true), '')::jsonb ->> 'sub') ) $$; + AS $$ + select coalesce( + nullif(current_setting('request.jwt.claim.sub', true), ''), + (nullif(current_setting('request.jwt.claims', true), '')::jsonb ->> 'sub') + ) +$$; SET default_tablespace = ''; @@ -71,16 +141,37 @@ SET default_tablespace = ''; SET default_table_access_method = heap; -- --- Name: _migrations; Type: TABLE; Schema: public; Owner: - +-- Name: __drizzle_migrations; Type: TABLE; Schema: public; Owner: - -- -CREATE TABLE public._migrations ( - name text NOT NULL, +CREATE TABLE public.__drizzle_migrations ( + id integer NOT NULL, hash text NOT NULL, - date timestamp with time zone DEFAULT now() NOT NULL + created_at bigint, + created_at_timestamp timestamp without time zone GENERATED ALWAYS AS (to_timestamp(((created_at / 1000))::double precision)) STORED ); +-- +-- Name: __drizzle_migrations_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.__drizzle_migrations_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: __drizzle_migrations_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.__drizzle_migrations_id_seq OWNED BY public.__drizzle_migrations.id; + + -- -- Name: connection; Type: TABLE; Schema: public; Owner: - -- @@ -125,6 +216,24 @@ CREATE TABLE public.connector_config ( ); +-- +-- Name: event; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.event ( + id character varying DEFAULT 'concat(''evt_'', generate_ulid())'::character varying NOT NULL, + name character varying NOT NULL, + data jsonb NOT NULL, + "timestamp" timestamp with time zone DEFAULT now() NOT NULL, + "user" jsonb, + v character varying, + org_id character varying GENERATED ALWAYS AS (("user" ->> 'org_id'::text)) STORED, + user_id character varying GENERATED ALWAYS AS (("user" ->> 'user_id'::text)) STORED, + customer_id character varying GENERATED ALWAYS AS (("user" ->> 'customer_id'::text)) STORED, + CONSTRAINT event_id_prefix_check CHECK (starts_with((id)::text, 'evt_'::text)) +); + + -- -- Name: integration; Type: TABLE; Schema: public; Owner: - -- @@ -165,11 +274,26 @@ CREATE TABLE public.pipeline ( -- --- Name: _migrations _migrations_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- Name: __drizzle_migrations id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.__drizzle_migrations ALTER COLUMN id SET DEFAULT nextval('public.__drizzle_migrations_id_seq'::regclass); + + +-- +-- Name: __drizzle_migrations __drizzle_migrations_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.__drizzle_migrations + ADD CONSTRAINT __drizzle_migrations_pkey PRIMARY KEY (id); + + +-- +-- Name: event event_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- -ALTER TABLE ONLY public._migrations - ADD CONSTRAINT _migrations_pkey PRIMARY KEY (name); +ALTER TABLE ONLY public.event + ADD CONSTRAINT event_pkey PRIMARY KEY (id); -- @@ -232,6 +356,34 @@ CREATE INDEX connection_provider_name ON public.connection USING btree (connecto CREATE INDEX connection_updated_at ON public.connection USING btree (updated_at); +-- +-- Name: event_customer_id; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX event_customer_id ON public.event USING btree (customer_id); + + +-- +-- Name: event_org_id; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX event_org_id ON public.event USING btree (org_id); + + +-- +-- Name: event_timestamp; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX event_timestamp ON public.event USING btree ("timestamp"); + + +-- +-- Name: event_user_id; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX event_user_id ON public.event USING btree (user_id); + + -- -- Name: institution_created_at; Type: INDEX; Schema: public; Owner: - -- @@ -322,7 +474,7 @@ ALTER TABLE ONLY public.connection -- ALTER TABLE ONLY public.connector_config - ADD CONSTRAINT fk_default_pipe_in_source_id FOREIGN KEY (default_pipe_in_source_id) REFERENCES public.connection(id) ON UPDATE RESTRICT ON DELETE RESTRICT; + ADD CONSTRAINT fk_default_pipe_in_source_id FOREIGN KEY (default_pipe_in_source_id) REFERENCES public.connection(id) ON UPDATE RESTRICT ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED; -- @@ -330,7 +482,7 @@ ALTER TABLE ONLY public.connector_config -- ALTER TABLE ONLY public.connector_config - ADD CONSTRAINT fk_default_pipe_out_destination_id FOREIGN KEY (default_pipe_out_destination_id) REFERENCES public.connection(id) ON UPDATE RESTRICT ON DELETE RESTRICT; + ADD CONSTRAINT fk_default_pipe_out_destination_id FOREIGN KEY (default_pipe_out_destination_id) REFERENCES public.connection(id) ON UPDATE RESTRICT ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED; -- @@ -371,12 +523,6 @@ ALTER TABLE ONLY public.pipeline COMMENT ON CONSTRAINT fk_source_id ON public.pipeline IS '@graphql({"foreign_name": "source", "local_name": "sourceOfPipelines"})'; --- --- Name: _migrations; Type: ROW SECURITY; Schema: public; Owner: - --- - -ALTER TABLE public._migrations ENABLE ROW LEVEL SECURITY; - -- -- Name: connection; Type: ROW SECURITY; Schema: public; Owner: - -- @@ -416,6 +562,26 @@ CREATE POLICY customer_access ON public.pipeline TO customer USING (( SELECT (AR WHERE ((connector_config.org_id)::text = (public.jwt_org_id())::text))) AND ((connection.customer_id)::text = (( SELECT public.jwt_customer_id() AS jwt_customer_id))::text))) && ARRAY[pipeline.source_id, pipeline.destination_id]))); +-- +-- Name: event customer_append; Type: POLICY; Schema: public; Owner: - +-- + +CREATE POLICY customer_append ON public.event FOR INSERT TO customer WITH CHECK (((org_id)::text = (public.jwt_org_id())::text)); + + +-- +-- Name: event customer_read; Type: POLICY; Schema: public; Owner: - +-- + +CREATE POLICY customer_read ON public.event FOR SELECT TO customer USING (((org_id)::text = (public.jwt_org_id())::text)); + + +-- +-- Name: event; Type: ROW SECURITY; Schema: public; Owner: - +-- + +ALTER TABLE public.event ENABLE ROW LEVEL SECURITY; + -- -- Name: integration; Type: ROW SECURITY; Schema: public; Owner: - -- @@ -453,6 +619,13 @@ CREATE POLICY org_access ON public.pipeline TO org USING (( SELECT (ARRAY( SELEC WHERE ((i.org_id)::text = (public.jwt_org_id())::text)) @> ARRAY[pipeline.source_id, pipeline.destination_id]))); +-- +-- Name: event org_append; Type: POLICY; Schema: public; Owner: - +-- + +CREATE POLICY org_append ON public.event FOR INSERT TO org WITH CHECK (((org_id)::text = (public.jwt_org_id())::text)); + + -- -- Name: connection org_member_access; Type: POLICY; Schema: public; Owner: - -- @@ -484,6 +657,27 @@ CREATE POLICY org_member_access ON public.pipeline TO authenticated USING ((ARRA WHERE ((i.org_id)::text = (public.jwt_org_id())::text)) @> ARRAY[source_id, destination_id])); +-- +-- Name: event org_member_append; Type: POLICY; Schema: public; Owner: - +-- + +CREATE POLICY org_member_append ON public.event FOR INSERT TO authenticated WITH CHECK (((org_id)::text = (public.jwt_org_id())::text)); + + +-- +-- Name: event org_member_read; Type: POLICY; Schema: public; Owner: - +-- + +CREATE POLICY org_member_read ON public.event FOR SELECT TO authenticated USING (((org_id)::text = (public.jwt_org_id())::text)); + + +-- +-- Name: event org_read; Type: POLICY; Schema: public; Owner: - +-- + +CREATE POLICY org_read ON public.event FOR SELECT TO org USING (((org_id)::text = (public.jwt_org_id())::text)); + + -- -- Name: integration org_write_access; Type: POLICY; Schema: public; Owner: - -- @@ -514,11 +708,11 @@ GRANT USAGE ON SCHEMA public TO org; -- --- Name: TABLE _migrations; Type: ACL; Schema: public; Owner: - +-- Name: TABLE __drizzle_migrations; Type: ACL; Schema: public; Owner: - -- -GRANT SELECT,INSERT,DELETE,UPDATE ON TABLE public._migrations TO org; -GRANT SELECT,INSERT,DELETE,UPDATE ON TABLE public._migrations TO authenticated; +GRANT SELECT,INSERT,DELETE,UPDATE ON TABLE public.__drizzle_migrations TO org; +GRANT SELECT,INSERT,DELETE,UPDATE ON TABLE public.__drizzle_migrations TO authenticated; -- @@ -587,6 +781,15 @@ GRANT SELECT(env_name) ON TABLE public.connector_config TO customer; GRANT SELECT(disabled) ON TABLE public.connector_config TO customer; +-- +-- Name: TABLE event; Type: ACL; Schema: public; Owner: - +-- + +GRANT SELECT ON TABLE public.event TO customer; +GRANT SELECT ON TABLE public.event TO org; +GRANT SELECT ON TABLE public.event TO authenticated; + + -- -- Name: TABLE integration; Type: ACL; Schema: public; Owner: - -- @@ -605,20 +808,6 @@ GRANT SELECT,INSERT,DELETE,UPDATE ON TABLE public.pipeline TO org; GRANT SELECT,INSERT,DELETE,UPDATE ON TABLE public.pipeline TO authenticated; --- --- Name: DEFAULT PRIVILEGES FOR SEQUENCES; Type: DEFAULT ACL; Schema: public; Owner: - --- - -ALTER DEFAULT PRIVILEGES FOR ROLE cloud_admin IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION; - - --- --- Name: DEFAULT PRIVILEGES FOR TABLES; Type: DEFAULT ACL; Schema: public; Owner: - --- - -ALTER DEFAULT PRIVILEGES FOR ROLE cloud_admin IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION; - - -- -- PostgreSQL database dump complete -- diff --git a/packages/db/__tests__/migration.spec.ts b/packages/db/__tests__/migration.spec.ts index 402b5a616..09378c522 100644 --- a/packages/db/__tests__/migration.spec.ts +++ b/packages/db/__tests__/migration.spec.ts @@ -1,9 +1,19 @@ import {describeEachDatabase} from './test-utils' -describeEachDatabase({drivers: ['pg', 'pglite'], migrate: false}, (db) => { - test('run migration', async () => { - await db.$migrate() - const res = await db.$exec('select * from connector_config') - expect(res.rows).toEqual([]) - }) -}) +// Extension always cause issues... so we just disable pglite for now... +// ReferenceError: You are trying to `import` a file outside of the scope of the test code. + +// at Runtime.linkAndEvaluateModule (node_modules/.pnpm/jest-runtime@30.0.0-alpha.6/node_modules/jest-runtime/build/index.js:678:13) +// at Yr (node_modules/.pnpm/@electric-sql+pglite@0.2.17/node_modules/@electric-sql/pglite/src/extensionUtils.ts:11:5) +describeEachDatabase( + {drivers: ['pg'], migrate: false, enableExtensions: true}, + (db) => { + test('run migration', async () => { + await db.$migrate() + const res = await db.$exec( + 'select count(*) as count from connector_config', + ) + expect(res.rows[0]).toMatchObject({count: expect.anything()}) + }) + }, +) diff --git a/packages/db/__tests__/rls.spec.ts b/packages/db/__tests__/rls.spec.ts new file mode 100644 index 000000000..357071798 --- /dev/null +++ b/packages/db/__tests__/rls.spec.ts @@ -0,0 +1,90 @@ +import {sql} from 'drizzle-orm' +import {describeEachDatabase} from './test-utils' + +describeEachDatabase({drivers: ['pglite']}, (db) => { + beforeAll(async () => { + for (const query of [ + sql` + CREATE TABLE customer_data ( + id SERIAL PRIMARY KEY, + customer_name TEXT NOT NULL, + email TEXT NOT NULL, + account_balance DECIMAL(10, 2) NOT NULL, + account_manager TEXT NOT NULL + ); + `, + // Enable Row-Level Security on the table + sql`ALTER TABLE customer_data ENABLE ROW LEVEL SECURITY;`, + // Insert first sample row + sql` + INSERT INTO customer_data (id, customer_name, email, account_balance, account_manager) + VALUES (100, 'Acme Corporation', 'contact@acme.com', 50000.00, 'manager_alice'); + `, + // Insert second sample row + sql` + INSERT INTO customer_data (customer_name, email, account_balance, account_manager) + VALUES ('Globex Industries', 'info@globex.com', 75000.00, 'manager_bob'); + `, + // Create first role + sql`CREATE ROLE manager_alice;`, + // Create second role + sql`CREATE ROLE manager_bob;`, + // Grant schema usage to both roles + sql`GRANT USAGE ON SCHEMA public TO manager_alice, manager_bob;`, + // Grant SELECT permission on the table to both roles + sql`GRANT ALL ON customer_data TO manager_alice, manager_bob;`, + // Create policy for manager_alice + sql` + CREATE POLICY alice_data_access ON customer_data + FOR ALL + TO manager_alice + USING (account_manager = 'manager_alice') + `, + // Create policy for manager_bob + sql` + CREATE POLICY bob_data_access ON customer_data + FOR ALL + TO manager_bob + USING (account_manager = 'manager_bob'); + `, + ]) { + await db.$exec(query) + } + }) + + test('Update works for valid inserts', async () => { + const res = await db.transaction(async (tx) => { + await tx.execute(`SET LOCAL ROLE manager_alice`) + return await tx.execute(` + INSERT INTO customer_data (id, customer_name, email, account_balance, account_manager) + VALUES (100, 'Updated', 'contact@acme.com', 50000.00, 'manager_alice') + ON CONFLICT (id) + DO UPDATE SET + customer_name = EXCLUDED.customer_name, + email = EXCLUDED.email, + account_balance = EXCLUDED.account_balance, + account_manager = EXCLUDED.account_manager + RETURNING * +`) + }) + expect(res.rows[0]).toMatchObject({id: 100, customer_name: 'Updated'}) + }) + + test('Update fails when insert violates RLS', async () => { + await expect( + db.transaction(async (tx) => { + await tx.execute(`SET LOCAL ROLE manager_alice`) + return await tx.execute(` + INSERT INTO customer_data (id, customer_name) + VALUES (100, 'insert no rls') + ON CONFLICT (id) + DO UPDATE SET + customer_name = EXCLUDED.customer_name + RETURNING * +`) + }), + ).rejects.toThrow( + 'new row violates row-level security policy for table "customer_data"', + ) + }) +}) diff --git a/packages/db/__tests__/test-utils.ts b/packages/db/__tests__/test-utils.ts index ab66a7789..d0e868e22 100644 --- a/packages/db/__tests__/test-utils.ts +++ b/packages/db/__tests__/test-utils.ts @@ -1,63 +1,98 @@ +import path from 'node:path' import {generateDrizzleJson, generateMigration} from 'drizzle-kit/api' import {env, envRequired} from '@openint/env' +import {snakeCase} from '@openint/util' import {schema} from '..' import type {Database, DatabaseDriver} from '../db' import {initDbNeon} from '../db.neon' import {initDbPg} from '../db.pg' import {initDbPGLite} from '../db.pglite' +interface TestDbInitOptions { + url: string + /** For pglite, whether to enable postgres extensions */ + enableExtensions?: boolean +} + export const testDbs = { // neon driver does not work well for migration at the moment and // and should therefore not be used for running migrations - neon: () => - initDbNeon( - env.DATABASE_URL_UNPOOLED ?? envRequired.DATABASE_URL, - {role: 'system'}, - {logger: false}, - ), - pglite: () => initDbPGLite({logger: false, enableExtensions: true}), - pg: () => - // TODO: Make test database url separate env var from prod database url to be safer - initDbPg(env.DATABASE_URL_UNPOOLED ?? envRequired.DATABASE_URL, { - logger: false, - }), + neon: ({url}: TestDbInitOptions) => + initDbNeon(url, {role: 'system'}, {logger: false}), + pg: ({url}: TestDbInitOptions) => initDbPg(url, {logger: false}), + pglite: ({enableExtensions}: TestDbInitOptions) => + initDbPGLite({logger: false, enableExtensions}), } -export interface DescribeEachDatabaseOptions { - drivers?: DatabaseDriver[] +export type DescribeEachDatabaseOptions< + T extends DatabaseDriver = DatabaseDriver, +> = { + randomDatabaseFromFilename?: string + drivers?: T[] migrate?: boolean truncateBeforeAll?: boolean -} -export function describeEachDatabase( - options: DescribeEachDatabaseOptions, - testBlock: (db: Database) => void, + enableExtensions?: boolean +} & Omit + +export function describeEachDatabase( + options: DescribeEachDatabaseOptions, + testBlock: (db: Database) => void, ) { const { - drivers = ['pg', 'pglite'], - migrate = true, - truncateBeforeAll = true, + randomDatabaseFromFilename: prefix, + drivers = ['pglite'], + migrate = false, + truncateBeforeAll = false, + ...testDbOpts } = options const dbEntriesFiltered = Object.entries(testDbs).filter(([d]) => - drivers.includes(d as DatabaseDriver), - ) + drivers.includes(d as any), + ) as Array<[T, (opts: TestDbInitOptions) => Database]> - describe.each(dbEntriesFiltered)('db: %s', (_driver, makeDb) => { - const db = makeDb() + describe.each(dbEntriesFiltered)('db: %s', (driver, makeDb) => { + const baseUrl = new URL( + // TODO: Make test database url separate env var from prod database url to be safer + env.DATABASE_URL_UNPOOLED ?? envRequired.DATABASE_URL, + ) + let baseDb: Database | undefined - if (migrate) { - beforeAll(async () => { - await db.$migrate() - }) + const name = prefix + ? `${snakeCase(path.basename(prefix, path.extname(prefix)))}_${new Date() + .toISOString() + .replaceAll(/[:Z\-\.]/g, '') + .replace(/T/, '_')}_${driver}` + : undefined + const url = new URL(baseUrl) + if (name && url.pathname !== `/${name}`) { + url.pathname = `/${name}` } - if (truncateBeforeAll) { - beforeAll(async () => { + const db = makeDb({url: url.toString(), ...testDbOpts}) + + beforeAll(async () => { + if (driver !== 'pglite' && url.toString() !== baseUrl.toString()) { + baseDb = makeDb({url: baseUrl.toString(), ...testDbOpts}) + await baseDb.execute(`DROP DATABASE IF EXISTS ${name}`) + await baseDb.execute(`CREATE DATABASE ${name}`) + } + if (migrate) { + await db.$migrate() + } + if (truncateBeforeAll) { await db.$truncateAll() - }) - } + } + }) testBlock(db) + + afterAll(async () => { + await db.$end?.() + // Cleaning is not often possible because connection poolers will attempt + // to hold on to references of database preventing drops + // await baseDb?.execute(`DROP DATABASE IF EXISTS ${name}`) + await baseDb?.$end?.() + }, 1000) }) } diff --git a/packages/db/db.neon.ts b/packages/db/db.neon.ts index 4c61ad656..f40028f20 100644 --- a/packages/db/db.neon.ts +++ b/packages/db/db.neon.ts @@ -1,4 +1,4 @@ -import {neon, neonConfig} from '@neondatabase/serverless' +import {HTTPQueryOptions, neon, neonConfig} from '@neondatabase/serverless' import {drizzle as drizzlePgProxy} from 'drizzle-orm/pg-proxy' import {migrate} from 'drizzle-orm/pg-proxy/migrator' import type {Viewer} from '@openint/cdk' @@ -29,8 +29,6 @@ export function initDbNeon( const neonSql = neon(url) const db = drizzlePgProxy(async (query, params, method) => { - const guc = localGucForViewer(viewer) - // NOTE: this should work but it doesn't, for now hardcoding converting any updated_at and created_at to iso strings // import types from 'pg-types' @@ -38,15 +36,30 @@ export function initDbNeon( // console.log('Timestamp parser called with:', value) // return value ? new Date(value).toISOString() : null // }) - const allResponses = await neonSql.transaction( - [ - ...Object.entries(guc).map( - ([key, value]) => neonSql`SELECT set_config(${key}, ${value}, true)`, - ), - neonSql(query, params), - ], - {fullResults: true, arrayMode: method === 'all' /* types, */}, - ) + const opts: HTTPQueryOptions = { + fullResults: true, + arrayMode: method === 'all', + /* types, */ + } + + // Bypass setting guc for system viewer completely to avoid unnecessary transactions + // that prevent things like DROP DATABASE + // guc settings are local to transactions anyways and without setting them should have the + // same impact as reset role + + const allResponses = + viewer.role === 'system' + ? await neonSql(query, params, opts).then((r) => [r]) + : await neonSql.transaction( + [ + ...Object.entries(localGucForViewer(viewer)).map( + ([key, value]) => + neonSql`SELECT set_config(${key}, ${value}, true)`, + ), + neonSql(query, params), + ], + opts, + ) const res = allResponses.pop() // TODO: Make me work for arrayMode: true diff --git a/packages/db/db.pg.ts b/packages/db/db.pg.ts index 870996146..e68359aca 100644 --- a/packages/db/db.pg.ts +++ b/packages/db/db.pg.ts @@ -16,5 +16,8 @@ export function initDbPg(url: string, options: DbOptions = {}) { $migrate() { return migrate(db, getMigrationConfig()) }, + $end() { + return pool.end() + } }) } diff --git a/packages/db/db.pglite.ts b/packages/db/db.pglite.ts index 0c6f0cfea..7410ef197 100644 --- a/packages/db/db.pglite.ts +++ b/packages/db/db.pglite.ts @@ -28,5 +28,8 @@ export function initDbPGLite({ $migrate() { return migrate(db, getMigrationConfig()) }, + $end() { + return pglite.close() + }, }) } diff --git a/packages/db/db.ts b/packages/db/db.ts index 6241b42d0..8008f5d84 100644 --- a/packages/db/db.ts +++ b/packages/db/db.ts @@ -49,6 +49,7 @@ interface SpecificExtensions { query: string | SQLWrapper, ): Promise<{rows: Array>}> $migrate(): Promise + $end?(): Promise } export function dbFactory( diff --git a/packages/db/lib/upsert-sql.spec.ts b/packages/db/lib/upsert-sql.spec.ts new file mode 100644 index 000000000..e7baeb3a9 --- /dev/null +++ b/packages/db/lib/upsert-sql.spec.ts @@ -0,0 +1,307 @@ +import {generateDrizzleJson, generateMigration} from 'drizzle-kit/api' +import {sql} from 'drizzle-orm' +import {drizzle} from 'drizzle-orm/node-postgres' +import { + boolean, + integer, + jsonb, + pgTable, + primaryKey, + serial, + varchar, +} from 'drizzle-orm/pg-core' +import {dbUpsert, dbUpsertOne, inferTableForUpsert} from './upsert' + +async function formatSql(sqlString: string) { + const prettier = await import('prettier') + const prettierSql = await import('prettier-plugin-sql') + return prettier.format(sqlString, { + parser: 'sql', + plugins: [prettierSql.default], + // https://github.com/un-ts/prettier/tree/master/packages/sql#sql-in-js-with-prettier-plugin-embed + ['language' as 'filepath' /* workaround type error */]: 'postgresql', + }) +} + +const noopDb = drizzle('postgres://noop', {logger: false}) + +test('upsert query', async () => { + const engagement_sequence = pgTable( + 'engagement_sequence', + (t) => ({ + source_id: t.text().notNull(), + // customer_id + // integration_id + // connector_name + // these are all derived + id: t.text().notNull(), + created_at: t + .timestamp({ + precision: 3, + mode: 'string', + }) + .defaultNow() + .notNull(), + updated_at: t + .timestamp({ + precision: 3, + mode: 'string', + }) + .defaultNow() + .notNull(), + is_deleted: t.boolean().default(false).notNull(), + raw: t.jsonb(), + unified: t.jsonb(), + }), + (table) => ({ + primaryKey: primaryKey({ + columns: [table.source_id, table.id], + name: 'engagement_sequence_pkey', + }), + }), + ) + const query = dbUpsert( + noopDb, + engagement_sequence, + [ + { + source_id: 'source_id', + id: '123', + is_deleted: false, + // Workaround jsonb support issue... https://github.com/drizzle-team/drizzle-orm/issues/724 + raw: sql`${{hello: 1}}::jsonb`, + unified: sql`${{world: 2}}::jsonb`, + }, + ], + { + shallowMergeJsonbColumns: ['raw', 'unified'], + noDiffColumns: ['updated_at'], + }, + ) + expect(await formatSql(query?.toSQL().sql ?? '')).toMatchInlineSnapshot(` + "insert into + "engagement_sequence" ( + "source_id", + "id", + "created_at", + "updated_at", + "is_deleted", + "raw", + "unified" + ) + values + ( + $1, + $2, + default, + default, + $3, + $4::jsonb, + $5::jsonb + ) + on conflict ("source_id", "id") do + update + set + "is_deleted" = excluded."is_deleted", + "raw" = COALESCE("engagement_sequence"."raw", '{}'::jsonb) || excluded."raw", + "unified" = COALESCE("engagement_sequence"."unified", '{}'::jsonb) || excluded."unified" + where + ( + "engagement_sequence"."is_deleted" IS DISTINCT FROM excluded."is_deleted" + or "engagement_sequence"."raw" IS DISTINCT FROM excluded."raw" + or "engagement_sequence"."unified" IS DISTINCT FROM excluded."unified" + ) + " + `) +}) + +test('upsert param handling inc. jsonb', async () => { + const query = dbUpsertOne( + noopDb, + pgTable('test', { + id: serial().primaryKey(), + is_deleted: boolean(), + data: jsonb(), + data2: jsonb(), + data3: jsonb(), + name: varchar(), + name2: varchar(), + name3: varchar(), + count: integer(), + count2: integer(), + count3: integer(), + }), + { + id: '123' as never, + is_deleted: {} as never, + data: 'abc', + data2: true, + data3: ['123'], + name: 'abc', + name2: true as never, + name3: ['123'] as never, + count: 'abc' as never, + count2: true as never, + count3: ['123'] as never, + }, + { + keyColumns: ['id' as never], + }, + ) + expect(query.toSQL().params).toEqual([ + '123', + {}, + '"abc"', // jsonb + 'true', // jsonb + '["123"]', // jsonb + 'abc', // scalar (varchar) + true, // scalar (varchar) + ['123'], // scalar (varchar) + 'abc', // scalar (integer) + true, // scalar (integer) + ['123'], // scalar (integer) + ]) + expect(await formatSql(query?.toSQL().sql)).toMatchInlineSnapshot(` + "insert into + "test" ( + "id", + "is_deleted", + "data", + "data2", + "data3", + "name", + "name2", + "name3", + "count", + "count2", + "count3" + ) + values + ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + on conflict ("id") do + update + set + "is_deleted" = excluded."is_deleted", + "data" = excluded."data", + "data2" = excluded."data2", + "data3" = excluded."data3", + "name" = excluded."name", + "name2" = excluded."name2", + "name3" = excluded."name3", + "count" = excluded."count", + "count2" = excluded."count2", + "count3" = excluded."count3" + where + ( + "test"."is_deleted" IS DISTINCT FROM excluded."is_deleted" + or "test"."data" IS DISTINCT FROM excluded."data" + or "test"."data2" IS DISTINCT FROM excluded."data2" + or "test"."data3" IS DISTINCT FROM excluded."data3" + or "test"."name" IS DISTINCT FROM excluded."name" + or "test"."name2" IS DISTINCT FROM excluded."name2" + or "test"."name3" IS DISTINCT FROM excluded."name3" + or "test"."count" IS DISTINCT FROM excluded."count" + or "test"."count2" IS DISTINCT FROM excluded."count2" + or "test"."count3" IS DISTINCT FROM excluded."count3" + ) + " + `) +}) + +test('upsert query with inferred table', async () => { + const date = new Date() + const row = { + id: '123', + name: 'abc', + count: 1, + data: {hello: 'world'}, + arr: ['a', 'b'], + date, + } + const table = inferTableForUpsert('test_user', row) + const createTableSql = await generateMigration( + generateDrizzleJson({}), + generateDrizzleJson({table}), + ).then((statements) => statements[0]) + expect(createTableSql).toMatchInlineSnapshot(` + "CREATE TABLE "test_user" ( + "id" text, + "name" text, + "count" text, + "data" jsonb, + "arr" jsonb, + "date" text + ); + " + `) + + const query = dbUpsertOne(drizzle(''), table, row, {keyColumns: ['id']}) + expect(query.toSQL().params).toEqual([ + '123', + 'abc', + 1, + '{"hello":"world"}', + '["a","b"]', + date, + ]) + expect(await formatSql(query.toSQL().sql)).toMatchInlineSnapshot(` + "insert into + "test_user" ("id", "name", "count", "data", "arr", "date") + values + ($1, $2, $3, $4, $5, $6) + on conflict ("id") do + update + set + "name" = excluded."name", + "count" = excluded."count", + "data" = excluded."data", + "arr" = excluded."arr", + "date" = excluded."date" + where + ( + "test_user"."name" IS DISTINCT FROM excluded."name" + or "test_user"."count" IS DISTINCT FROM excluded."count" + or "test_user"."data" IS DISTINCT FROM excluded."data" + or "test_user"."arr" IS DISTINCT FROM excluded."arr" + or "test_user"."date" IS DISTINCT FROM excluded."date" + ) + " + `) +}) + +describe('casing cache bug', () => { + test('repro', () => { + const t1 = pgTable('table', (t) => ({ + id: t.text().primaryKey(), + })) + + expect(noopDb.insert(t1).values({id: '1'}).toSQL().sql).toEqual( + 'insert into "table" ("id") values ($1)', + ) + const t2 = pgTable('table', (t) => ({ + id: t.text().primaryKey(), + name: t.text(), + })) + expect( + noopDb.insert(t2).values({id: '1', name: 'test'}).toSQL().sql, + ).toEqual('insert into "table" ("id", "undefined") values ($1, $2)') + }) + + test('workaround', () => { + // explicit name setting cause the column.keyAsName to be set to false, bypassing casing cache completely + const t1 = pgTable('table', (t) => ({ + id: t.text('id').primaryKey(), + })) + + expect(noopDb.insert(t1).values({id: '1'}).toSQL().sql).toEqual( + 'insert into "table" ("id") values ($1)', + ) + const t2 = pgTable('table', (t) => ({ + id: t.text('id').primaryKey(), + name: t.text('name'), + })) + expect( + noopDb.insert(t2).values({id: '1', name: 'test'}).toSQL().sql, + ).toEqual('insert into "table" ("id", "name") values ($1, $2)') + }) +}) diff --git a/packages/db/lib/upsert.spec.ts b/packages/db/lib/upsert.spec.ts index fa53d562e..a3fa227ae 100644 --- a/packages/db/lib/upsert.spec.ts +++ b/packages/db/lib/upsert.spec.ts @@ -1,291 +1,19 @@ -import {generateDrizzleJson, generateMigration} from 'drizzle-kit/api' import {sql} from 'drizzle-orm' import { - boolean, - integer, - jsonb, - pgTable, - primaryKey, - serial, - varchar, -} from 'drizzle-orm/pg-core' -import {drizzle} from 'drizzle-orm/postgres-js' -import postgres from 'postgres' -import {env} from '@openint/env' -import {dbUpsert, dbUpsertOne, inferTableForUpsert} from './upsert' - -async function formatSql(sqlString: string) { - const prettier = await import('prettier') - const prettierSql = await import('prettier-plugin-sql') - return prettier.format(sqlString, { - parser: 'sql', - plugins: [prettierSql.default], - // https://github.com/un-ts/prettier/tree/master/packages/sql#sql-in-js-with-prettier-plugin-embed - ['language' as 'filepath' /* workaround type error */]: 'postgresql', - }) + describeEachDatabase, + DescribeEachDatabaseOptions, +} from '../__tests__/test-utils' +import {dbUpsert, dbUpsertOne} from './upsert' + +const options: DescribeEachDatabaseOptions = { + randomDatabaseFromFilename: __filename, + migrate: false, + drivers: ['neon', 'pg', 'pglite'], } -const noopDb = drizzle('postgres://noop', {logger: true}) - -test('upsert query', async () => { - const engagement_sequence = pgTable( - 'engagement_sequence', - (t) => ({ - source_id: t.text().notNull(), - // customer_id - // integration_id - // connector_name - // these are all derived - id: t.text().notNull(), - created_at: t - .timestamp({ - precision: 3, - mode: 'string', - }) - .defaultNow() - .notNull(), - updated_at: t - .timestamp({ - precision: 3, - mode: 'string', - }) - .defaultNow() - .notNull(), - is_deleted: t.boolean().default(false).notNull(), - raw: t.jsonb(), - unified: t.jsonb(), - }), - (table) => ({ - primaryKey: primaryKey({ - columns: [table.source_id, table.id], - name: 'engagement_sequence_pkey', - }), - }), - ) - const query = dbUpsert( - noopDb, - engagement_sequence, - [ - { - source_id: 'source_id', - id: '123', - is_deleted: false, - // Workaround jsonb support issue... https://github.com/drizzle-team/drizzle-orm/issues/724 - raw: sql`${{hello: 1}}::jsonb`, - unified: sql`${{world: 2}}::jsonb`, - }, - ], - { - shallowMergeJsonbColumns: ['raw', 'unified'], - noDiffColumns: ['updated_at'], - }, - ) - expect(await formatSql(query?.toSQL().sql ?? '')).toMatchInlineSnapshot(` - "insert into - "engagement_sequence" ( - "source_id", - "id", - "created_at", - "updated_at", - "is_deleted", - "raw", - "unified" - ) - values - ( - $1, - $2, - default, - default, - $3, - $4::jsonb, - $5::jsonb - ) - on conflict ("source_id", "id") do - update - set - "is_deleted" = excluded."is_deleted", - "raw" = COALESCE("engagement_sequence"."raw", '{}'::jsonb) || excluded."raw", - "unified" = COALESCE("engagement_sequence"."unified", '{}'::jsonb) || excluded."unified" - where - ( - "engagement_sequence"."is_deleted" IS DISTINCT FROM excluded."is_deleted" - or "engagement_sequence"."raw" IS DISTINCT FROM excluded."raw" - or "engagement_sequence"."unified" IS DISTINCT FROM excluded."unified" - ) - " - `) -}) - -test('upsert param handling inc. jsonb', async () => { - const query = dbUpsertOne( - noopDb, - pgTable('test', { - id: serial().primaryKey(), - is_deleted: boolean(), - data: jsonb(), - data2: jsonb(), - data3: jsonb(), - name: varchar(), - name2: varchar(), - name3: varchar(), - count: integer(), - count2: integer(), - count3: integer(), - }), - { - id: '123' as never, - is_deleted: {} as never, - data: 'abc', - data2: true, - data3: ['123'], - name: 'abc', - name2: true as never, - name3: ['123'] as never, - count: 'abc' as never, - count2: true as never, - count3: ['123'] as never, - }, - { - keyColumns: ['id' as never], - }, - ) - expect(query.toSQL().params).toEqual([ - '123', - {}, - '"abc"', // jsonb - 'true', // jsonb - '["123"]', // jsonb - 'abc', // scalar (varchar) - true, // scalar (varchar) - ['123'], // scalar (varchar) - 'abc', // scalar (integer) - true, // scalar (integer) - ['123'], // scalar (integer) - ]) - expect(await formatSql(query?.toSQL().sql)).toMatchInlineSnapshot(` - "insert into - "test" ( - "id", - "is_deleted", - "data", - "data2", - "data3", - "name", - "name2", - "name3", - "count", - "count2", - "count3" - ) - values - ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - on conflict ("id") do - update - set - "is_deleted" = excluded."is_deleted", - "data" = excluded."data", - "data2" = excluded."data2", - "data3" = excluded."data3", - "name" = excluded."name", - "name2" = excluded."name2", - "name3" = excluded."name3", - "count" = excluded."count", - "count2" = excluded."count2", - "count3" = excluded."count3" - where - ( - "test"."is_deleted" IS DISTINCT FROM excluded."is_deleted" - or "test"."data" IS DISTINCT FROM excluded."data" - or "test"."data2" IS DISTINCT FROM excluded."data2" - or "test"."data3" IS DISTINCT FROM excluded."data3" - or "test"."name" IS DISTINCT FROM excluded."name" - or "test"."name2" IS DISTINCT FROM excluded."name2" - or "test"."name3" IS DISTINCT FROM excluded."name3" - or "test"."count" IS DISTINCT FROM excluded."count" - or "test"."count2" IS DISTINCT FROM excluded."count2" - or "test"."count3" IS DISTINCT FROM excluded."count3" - ) - " - `) -}) - -test('upsert query with inferred table', async () => { - const date = new Date() - const row = { - id: '123', - name: 'abc', - count: 1, - data: {hello: 'world'}, - arr: ['a', 'b'], - date, - } - const table = inferTableForUpsert('test_user', row) - const createTableSql = await generateMigration( - generateDrizzleJson({}), - generateDrizzleJson({table}), - ).then((statements) => statements[0]) - expect(createTableSql).toMatchInlineSnapshot(` - "CREATE TABLE "test_user" ( - "id" text, - "name" text, - "count" text, - "data" jsonb, - "arr" jsonb, - "date" text - ); - " - `) - - const query = dbUpsertOne(drizzle(''), table, row, {keyColumns: ['id']}) - expect(query.toSQL().params).toEqual([ - '123', - 'abc', - 1, - '{"hello":"world"}', - '["a","b"]', - date, - ]) - expect(await formatSql(query.toSQL().sql)).toMatchInlineSnapshot(` - "insert into - "test_user" ("id", "name", "count", "data", "arr", "date") - values - ($1, $2, $3, $4, $5, $6) - on conflict ("id") do - update - set - "name" = excluded."name", - "count" = excluded."count", - "data" = excluded."data", - "arr" = excluded."arr", - "date" = excluded."date" - where - ( - "test_user"."name" IS DISTINCT FROM excluded."name" - or "test_user"."count" IS DISTINCT FROM excluded."count" - or "test_user"."data" IS DISTINCT FROM excluded."data" - or "test_user"."arr" IS DISTINCT FROM excluded."arr" - or "test_user"."date" IS DISTINCT FROM excluded."date" - ) - " - `) -}) - -describe('with db', () => { - // console.log('filename', __filename) - const dbName = 'upsert_db' - - const dbUrl = new URL(env.DATABASE_URL) - dbUrl.pathname = `/${dbName}` - const db = drizzle(dbUrl.toString(), {logger: true}) - +describeEachDatabase(options, (db) => { beforeAll(async () => { - const masterDb = drizzle(env.DATABASE_URL, {logger: true}) - await masterDb.execute(`DROP DATABASE IF EXISTS ${dbName}`) - await masterDb.execute(`CREATE DATABASE ${dbName}`) - await masterDb.$client.end() - - await db.execute(sql` + await db.$exec(sql` CREATE TABLE IF NOT EXISTS "test_user" ( id text PRIMARY KEY, name text default 'unnamed', @@ -309,11 +37,11 @@ describe('with db', () => { }) afterEach(async () => { - await db.execute(sql`TRUNCATE "test_user"`) + await db.$exec(sql`TRUNCATE "test_user"`) }) test('upsert with inferred table', async () => { - const ret = await db.execute(sql`SELECT * FROM "test_user"`) + const {rows: ret} = await db.$exec(sql`SELECT * FROM "test_user"`) expect(ret[0]).toEqual(row) }) @@ -324,7 +52,7 @@ describe('with db', () => { {...row, name: null}, {keyColumns: ['id']}, ) - const ret2 = await db.execute(sql`SELECT * FROM "test_user"`) + const {rows: ret2} = await db.$exec(sql`SELECT * FROM "test_user"`) expect(ret2[0]).toEqual({...row, name: null}) }) @@ -335,7 +63,7 @@ describe('with db', () => { {...row, name: undefined}, {keyColumns: ['id']}, ) - const ret2 = await db.execute(sql`SELECT * FROM "test_user"`) + const {rows: ret2} = await db.$exec(sql`SELECT * FROM "test_user"`) expect(ret2[0]).toEqual(row) }) @@ -346,7 +74,7 @@ describe('with db', () => { {...row, name: undefined}, {keyColumns: ['id'], undefinedAsDefault: true}, ) - const ret2 = await db.execute(sql`SELECT * FROM "test_user"`) + const {rows: ret2} = await db.$exec(sql`SELECT * FROM "test_user"`) expect(ret2[0]).toEqual({...row, name: 'unnamed'}) }) @@ -362,7 +90,7 @@ describe('with db', () => { ) expect( - await db.execute(sql`SELECT * FROM "test_user"`).then((r) => r[0]), + await db.$exec(sql`SELECT * FROM "test_user"`).then((r) => r.rows[0]), ).toEqual({...row, name: 'original'}) }) @@ -372,8 +100,8 @@ describe('with db', () => { ) }) - test('db schema cache causes issue for upsert', async () => { - await db.execute(sql` + test('change inferred schema between upserts to same table', async () => { + await db.$exec(sql` CREATE TABLE IF NOT EXISTS "pipeline" ( id text PRIMARY KEY, num integer, @@ -383,29 +111,12 @@ describe('with db', () => { await dbUpsertOne(db, 'pipeline', {id: 1, num: 123}, {keyColumns: ['id']}) - await expect( - dbUpsertOne(db, 'pipeline', {id: 2, str: 'my'}, {keyColumns: ['id']}), - ).rejects.toThrow('column "undefined" of relation "pipeline"') + // This is no longer the case due to workaround + // await expect( + // dbUpsertOne(db, 'pipeline', {id: 2, str: 'my'}, {keyColumns: ['id']}), + // ).rejects.toThrow('column "undefined" of relation "pipeline"') + // clearing casing cache causes it to work - // casing cache causes it to work - ;(db as any).dialect.casing.clearCache() await dbUpsertOne(db, 'pipeline', {id: 2, str: 'my'}, {keyColumns: ['id']}) - - // recreating the db each time works better - const pg = postgres(dbUrl.toString()) - const d = () => drizzle(pg, {logger: true}) - await dbUpsertOne(d(), 'pipeline', {id: 3, num: 223}, {keyColumns: ['id']}) - await dbUpsertOne(d(), 'pipeline', {id: 4, str: 'my'}, {keyColumns: ['id']}) - const res = await d().execute('SELECT * FROM "pipeline"') - expect(res).toEqual([ - {id: '1', num: 123, str: null}, - {id: '2', num: null, str: 'my'}, - {id: '3', num: 223, str: null}, - {id: '4', num: null, str: 'my'}, - ]) - }) - - afterAll(async () => { - await db.$client.end() }) }) diff --git a/packages/db/lib/upsert.ts b/packages/db/lib/upsert.ts index fb95e3ed0..80709c618 100644 --- a/packages/db/lib/upsert.ts +++ b/packages/db/lib/upsert.ts @@ -94,6 +94,8 @@ export function dbUpsert< return } + // console.log('db upsert', _table.toString(), values, options) + const table = ( typeof _table === 'string' ? inferTableForUpsert(_table, firstRow) : _table ) as TTable @@ -101,7 +103,7 @@ export function dbUpsert< const tbCfg = getTableConfig(table) const getColumnOrThrow = (name: string) => { const col = getColumn(name) - if (!col) { + if (!col || !col.name) { throw new Error(`Column ${name} not found in table ${tbCfg.name}`) } return col @@ -199,10 +201,10 @@ export function inferTableForUpsert( Object.entries(record).map(([k, v]) => [ k, opts?.jsonColumns?.includes(k) || isPlainObject(v) || Array.isArray(v) - ? t.jsonb() + ? t.jsonb(k) : // text() works as a catch all for scalar types because none of them require // the value to be escaped in anyway - (t.text() as unknown as ReturnType), + (t.text(k) as unknown as ReturnType), ]), ), ) diff --git a/packages/db/scripts/formatNeonProxyPayload.ts b/packages/db/scripts/formatNeonProxyPayload.ts new file mode 100644 index 000000000..b49bed47d --- /dev/null +++ b/packages/db/scripts/formatNeonProxyPayload.ts @@ -0,0 +1,156 @@ +/** + * Usage: Copy text body from request to /sql endpoint of neon proxy and use it to run query directly in db + * Example command given the following input + * `pbpaste | bun packages/db/scripts/formatNeonProxyPayload.ts` +{ + "queries": [ + { + "query": "SELECT set_config($1, $2, true)", + "params": [ + "role", + "org" + ] + }, + { + "query": "SELECT set_config($1, $2, true)", + "params": [ + "request.jwt.claim.org_id", + "org_2tpltLjGSrsaDEqBOU5TgNTsLYC" + ] + }, + { + "query": "insert into \"connection\" (\"display_name\", \"customer_id\", \"disabled\", \"metadata\", \"id\") values ($1, $2, $3, $4, $5) on conflict (\"id\") do update set \"display_name\" = excluded.\"display_name\", \"customer_id\" = excluded.\"customer_id\", \"disabled\" = excluded.\"disabled\", \"metadata\" = COALESCE(\"connection\".\"metadata\", '{}'::jsonb) ||excluded.\"metadata\" where (\"connection\".\"display_name\" IS DISTINCT FROM excluded.\"display_name\" or \"connection\".\"customer_id\" IS DISTINCT FROM excluded.\"customer_id\" or \"connection\".\"disabled\" IS DISTINCT FROM excluded.\"disabled\" or \"connection\".\"metadata\" IS DISTINCT FROM excluded.\"metadata\")", + "params": [ + "displayName", + "fooCustomerId", + "false", + "{\"key\":\"value\"}", + "conn_greenhouse_01JNFHEJMQY4TSMDCFQ80HFBXJ" + ] + } + ] +} + */ + +// Generated by AI code below + + +/** + * Interface for a query item in the input JSON + */ +interface QueryItem { + query: string + params: Array +} + +/** + * Interface for the input JSON structure + */ +interface SqlQueriesJson { + queries: QueryItem[] +} + +/** + * Renders SQL queries by replacing parameter placeholders with their actual values + * @param jsonData - The JSON object containing queries and parameters + * @returns Array of rendered SQL queries + */ +function renderSqlQueries(jsonData: SqlQueriesJson): string[] { + const renderedQueries: string[] = [] + + // Check if the input has the expected structure + if (!jsonData || !jsonData.queries || !Array.isArray(jsonData.queries)) { + throw new Error("Invalid input format. Expected JSON with 'queries' array.") + } + + // Process each query in the array + for (const item of jsonData.queries) { + const {query, params} = item + + if (!query || !params || !Array.isArray(params)) { + throw new Error( + "Invalid query item. Each item must have 'query' and 'params' array.", + ) + } + + // Replace each parameter placeholder with its corresponding value + let renderedQuery: string = query + for (let i = 0; i < params.length; i++) { + const placeholder: string = `$${i + 1}` + const paramValue = params[i] + + // Format the parameter value based on its type + let formattedValue: string + if (typeof paramValue === 'string') { + // Check if it's already a JSON string + if (paramValue.startsWith('{') && paramValue.endsWith('}')) { + formattedValue = `'${paramValue}'` + } else { + formattedValue = `'${paramValue}'` + } + } else if (paramValue === null) { + formattedValue = 'NULL' + } else if (typeof paramValue === 'boolean') { + formattedValue = paramValue.toString() + } else if (typeof paramValue === 'object') { + formattedValue = `'${JSON.stringify(paramValue)}'` + } else { + formattedValue = paramValue?.toString() ?? '' + } + + // Replace the placeholder with the formatted value + renderedQuery = renderedQuery.replace(placeholder, formattedValue) + } + + renderedQueries.push(renderedQuery) + } + + return renderedQueries +} + +/** + * Main function that reads from stdin and processes the input + */ +function main(): void { + let inputData = '' + + // Read data from standard input + process.stdin.setEncoding('utf8') + + process.stdin.on('data', (chunk: string) => { + inputData += chunk + }) + + process.stdin.on('end', () => { + try { + console.log('BEGIN;') + // Parse the JSON input + const jsonData: SqlQueriesJson = JSON.parse(inputData) + + // Render the SQL queries + const renderedQueries = renderSqlQueries(jsonData) + + // Output the rendered queries + renderedQueries.forEach((query, index) => { + console.log(`-- Query ${index + 1}:`) + console.log(query + ';') + console.log('') + }) + console.log('COMMIT;') + } catch (error) { + console.error( + 'Error processing input:', + error instanceof Error ? error.message : error, + ) + process.exit(1) + } + }) + + process.stdin.on('error', (error) => { + console.error('Error reading from stdin:', error.message) + process.exit(1) + }) +} + +// Run the main function +main() diff --git a/packages/engine-backend/router/connectionRouter.ts b/packages/engine-backend/router/connectionRouter.ts index df81ec40d..5bcfe3de5 100644 --- a/packages/engine-backend/router/connectionRouter.ts +++ b/packages/engine-backend/router/connectionRouter.ts @@ -195,18 +195,18 @@ export const connectionRouter = trpc.router({ // How do we verify that the userId here is the same as the userId from preConnectOption? .output(z.string()) // TODO(api): We should not return just a string here. Should return an object .mutation(async ({input: {connectorConfigId, settings, ...input}, ctx}) => { - const int = + const ccfg = await ctx.asOrgIfNeeded.getConnectorConfigOrFail(connectorConfigId) - if (int.orgId !== ctx.viewer.orgId) { + if (ccfg.orgId !== ctx.viewer.orgId) { throw new TRPCError({ code: 'FORBIDDEN', - message: `You are not allowed to create connections for ${int.connectorName}`, + message: `You are not allowed to create connections for ${ccfg.connectorName}`, }) } const _extId = makeUlid() - const connId = makeId('conn', int.connector.name, _extId) + const connId = makeId('conn', ccfg.connector.name, _extId) // Should throw if not working.. const connUpdate = { @@ -214,8 +214,8 @@ export const connectionRouter = trpc.router({ // TODO: Should no longer depend on external ID connectionExternalId: _extId, settings, - ...(await int.connector.checkConnection?.({ - config: int.config, + ...(await ccfg.connector.checkConnection?.({ + config: ccfg.config, settings, context: {webhookBaseUrl: ''}, options: {}, @@ -224,11 +224,14 @@ export const connectionRouter = trpc.router({ customerId: ctx.viewer.role === 'customer' ? ctx.viewer.customerId : null, } satisfies ConnectionUpdate - await ctx.asOrgIfNeeded._syncConnectionUpdate(int, connUpdate) + await ctx.asOrgIfNeeded._syncConnectionUpdate(ccfg, connUpdate) // TODO: Do this in one go not two if (input.displayName) { - await ctx.services.patchReturning('connection', connId, input) + await ctx.services.patchReturning('connection', connId, { + ...input, + connectorConfigId, // UPSERT always requires connectorConfig id even if it's an update otherwise RLS will be violated + }) } // TODO: return the entire connection object... return connId @@ -264,7 +267,10 @@ export const connectionRouter = trpc.router({ if (ctx.viewer.role === 'customer') { await ctx.services.getConnectionOrFail(connId, true) } - const conn = await ctx.asOrgIfNeeded.getConnectionExpandedOrFail(connId, true) + const conn = await ctx.asOrgIfNeeded.getConnectionExpandedOrFail( + connId, + true, + ) const {settings, connectorConfig: ccfg} = conn if (!opts?.skipRevoke) { await ccfg.connector.revokeConnection?.( diff --git a/packages/engine-backend/services/sync-service.ts b/packages/engine-backend/services/sync-service.ts index 666d11188..093d6662d 100644 --- a/packages/engine-backend/services/sync-service.ts +++ b/packages/engine-backend/services/sync-service.ts @@ -436,7 +436,7 @@ export function makeSyncService({ } const _syncConnectionUpdate = async ( - int: _ConnectorConfig, + ccfg: _ConnectorConfig, { customerId, settings, @@ -444,7 +444,7 @@ export function makeSyncService({ ...connUpdate }: ConnectionUpdate, ) => { - console.log('[_syncConnectionUpdate]', int.id, { + console.log('[_syncConnectionUpdate]', ccfg.id, { customerId, settings, integration, @@ -452,12 +452,12 @@ export function makeSyncService({ }) const id = makeId( 'conn', - int.connector.name, + ccfg.connector.name, connUpdate.connectionExternalId, ) await metaLinks .handlers({ - connection: {id, connectorConfigId: int.id, customerId}, + connection: {id, connectorConfigId: ccfg.id, customerId}, }) .connUpdate({id, settings, integration})