Skip to content

Commit 8b19a70

Browse files
Source Postgres : use more simple and comprehensive query to get selectable tables (#14251)
* use more simple and comprehensive query to get selectable tables * cover case when schema is not specified * add test to check discover with different ways of grants * format * incr ver * incr ver * auto-bump connector version Co-authored-by: Octavia Squidington III <[email protected]>
1 parent c6ff5ab commit 8b19a70

File tree

6 files changed

+168
-120
lines changed

6 files changed

+168
-120
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@
747747
- name: Postgres
748748
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
749749
dockerRepository: airbyte/source-postgres
750-
dockerImageTag: 0.4.29
750+
dockerImageTag: 0.4.30
751751
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
752752
icon: postgresql.svg
753753
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6854,7 +6854,7 @@
68546854
supportsNormalization: false
68556855
supportsDBT: false
68566856
supported_destination_sync_modes: []
6857-
- dockerImage: "airbyte/source-postgres:0.4.29"
6857+
- dockerImage: "airbyte/source-postgres:0.4.30"
68586858
spec:
68596859
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
68606860
connectionSpecification:

airbyte-integrations/connectors/source-postgres/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.4.29
19+
LABEL io.airbyte.version=0.4.30
2020
LABEL io.airbyte.name=airbyte/source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java

+9-57
Original file line numberDiff line numberDiff line change
@@ -301,65 +301,17 @@ public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase
301301
final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator = connection -> {
302302
final PreparedStatement ps = connection.prepareStatement(
303303
"""
304-
SELECT DISTINCT table_catalog,
305-
table_schema,
306-
table_name,
307-
privilege_type
308-
FROM information_schema.table_privileges
309-
WHERE grantee = ?
310-
AND privilege_type = 'SELECT'
311-
UNION ALL
312-
SELECT r.rolname AS table_catalog,
313-
n.nspname AS table_schema,
314-
c.relname AS table_name,
315-
-- the initial query is supposed to get a SELECT type. Since we use a UNION query
316-
-- to get Views that we can read (i.e. select) - then lets fill this columns with SELECT
317-
-- value to keep the backward-compatibility
318-
COALESCE ('SELECT') AS privilege_type
304+
SELECT nspname as table_schema,
305+
relname as table_name
319306
FROM pg_class c
320-
JOIN pg_namespace n
321-
ON n.oid = relnamespace
322-
JOIN pg_roles r
323-
ON r.oid = relowner,
324-
Unnest(COALESCE(relacl::text[], Format('{%s=arwdDxt/%s}', rolname, rolname)::text[])) acl,
325-
Regexp_split_to_array(acl, '=|/') s
326-
WHERE r.rolname = ?
327-
AND (
328-
nspname = 'public'
329-
OR nspname = ?)
330-
-- 'm' means Materialized View
331-
AND c.relkind = 'm'
332-
AND (
333-
-- all grants
334-
c.relacl IS NULL
335-
-- read grant
336-
OR s[2] = 'r')
337-
UNION
338-
SELECT DISTINCT table_catalog,
339-
table_schema,
340-
table_name,
341-
privilege_type
342-
FROM information_schema.table_privileges p
343-
JOIN information_schema.applicable_roles r ON p.grantee = r.role_name
344-
WHERE r.grantee in
345-
(WITH RECURSIVE membership_tree(grpid, userid) AS (
346-
SELECT pg_roles.oid, pg_roles.oid
347-
FROM pg_roles WHERE oid = (select oid from pg_roles where rolname=?)
348-
UNION ALL
349-
SELECT m_1.roleid, t_1.userid
350-
FROM pg_auth_members m_1, membership_tree t_1
351-
WHERE m_1.member = t_1.grpid
352-
)
353-
SELECT DISTINCT m.rolname AS grpname
354-
FROM membership_tree t, pg_roles r, pg_roles m
355-
WHERE t.grpid = m.oid AND t.userid = r.oid)
356-
AND privilege_type = 'SELECT';
307+
JOIN pg_namespace n on c.relnamespace = n.oid
308+
WHERE has_table_privilege(c.oid, 'SELECT')
309+
-- r = ordinary table, i = index, S = sequence, t = TOAST table, v = view, m = materialized view, c = composite type, f = foreign table, p = partitioned table, I = partitioned index
310+
AND relkind in ('r', 'm', 'v', 't', 'f', 'p')
311+
and ((? is null) OR nspname = ?)
357312
""");
358-
final String username = getUsername(database.getDatabaseConfig());
359-
ps.setString(1, username);
360-
ps.setString(2, username);
361-
ps.setString(3, username);
362-
ps.setString(4, username);
313+
ps.setString(1, schema);
314+
ps.setString(2, schema);
363315
return ps;
364316
};
365317

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java

+95
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,101 @@ void testDiscoverRecursiveRolePermissions() throws Exception {
337337
}
338338
}
339339

340+
@Test
341+
void testDiscoverDifferentGrantAvailability() throws Exception {
342+
try (final PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine")) {
343+
db.start();
344+
final JsonNode config = getConfig(db);
345+
try (final DSLContext dslContext = getDslContext(config)) {
346+
final Database database = new Database(dslContext);
347+
database.query(ctx -> {
348+
ctx.fetch("create table not_granted_table_name_1(column_1 integer);");
349+
ctx.fetch("create table not_granted_table_name_2(column_1 integer);");
350+
ctx.fetch("create table not_granted_table_name_3(column_1 integer);");
351+
ctx.fetch("create table table_granted_by_role(column_1 integer);");
352+
ctx.fetch("create table test_table_granted_directly(column_1 integer);");
353+
ctx.fetch("create table table_granted_by_role_with_options(column_1 integer);");
354+
ctx.fetch("create table test_table_granted_directly_with_options(column_1 integer);");
355+
356+
ctx.fetch("create materialized view not_granted_mv_name_1 as SELECT not_granted_table_name_1.column_1 FROM not_granted_table_name_1;");
357+
ctx.fetch("create materialized view not_granted_mv_name_2 as SELECT not_granted_table_name_2.column_1 FROM not_granted_table_name_2;");
358+
ctx.fetch("create materialized view not_granted_mv_name_3 as SELECT not_granted_table_name_3.column_1 FROM not_granted_table_name_3;");
359+
ctx.fetch("create materialized view mv_granted_by_role as SELECT table_granted_by_role.column_1 FROM table_granted_by_role;");
360+
ctx.fetch(
361+
"create materialized view test_mv_granted_directly as SELECT test_table_granted_directly.column_1 FROM test_table_granted_directly;");
362+
ctx.fetch(
363+
"create materialized view mv_granted_by_role_with_options as SELECT table_granted_by_role_with_options.column_1 FROM table_granted_by_role_with_options;");
364+
ctx.fetch(
365+
"create materialized view test_mv_granted_directly_with_options as SELECT test_table_granted_directly_with_options.column_1 FROM test_table_granted_directly_with_options;");
366+
367+
ctx.fetch("create view not_granted_view_name_1(column_1) as SELECT not_granted_table_name_1.column_1 FROM not_granted_table_name_1;");
368+
ctx.fetch("create view not_granted_view_name_2(column_1) as SELECT not_granted_table_name_2.column_1 FROM not_granted_table_name_2;");
369+
ctx.fetch("create view not_granted_view_name_3(column_1) as SELECT not_granted_table_name_3.column_1 FROM not_granted_table_name_3;");
370+
ctx.fetch("create view view_granted_by_role(column_1) as SELECT table_granted_by_role.column_1 FROM table_granted_by_role;");
371+
ctx.fetch(
372+
"create view test_view_granted_directly(column_1) as SELECT test_table_granted_directly.column_1 FROM test_table_granted_directly;");
373+
ctx.fetch(
374+
"create view view_granted_by_role_with_options(column_1) as SELECT table_granted_by_role_with_options.column_1 FROM table_granted_by_role_with_options;");
375+
ctx.fetch(
376+
"create view test_view_granted_directly_with_options(column_1) as SELECT test_table_granted_directly_with_options.column_1 FROM test_table_granted_directly_with_options;");
377+
378+
ctx.fetch("create role test_role;");
379+
380+
ctx.fetch("grant delete on not_granted_table_name_2 to test_role;");
381+
ctx.fetch("grant delete on not_granted_mv_name_2 to test_role;");
382+
ctx.fetch("grant delete on not_granted_view_name_2 to test_role;");
383+
384+
ctx.fetch("grant select on table_granted_by_role to test_role;");
385+
ctx.fetch("grant select on mv_granted_by_role to test_role;");
386+
ctx.fetch("grant select on view_granted_by_role to test_role;");
387+
388+
ctx.fetch("grant select on table_granted_by_role_with_options to test_role with grant option;");
389+
ctx.fetch("grant select on mv_granted_by_role_with_options to test_role with grant option;");
390+
ctx.fetch("grant select on view_granted_by_role_with_options to test_role with grant option;");
391+
392+
ctx.fetch("create user new_test_user;");
393+
ctx.fetch("ALTER USER new_test_user WITH PASSWORD 'new_pass';");
394+
ctx.fetch("GRANT CONNECT ON DATABASE test TO new_test_user;");
395+
396+
ctx.fetch("grant test_role to new_test_user;");
397+
398+
ctx.fetch("grant delete on not_granted_table_name_3 to new_test_user;");
399+
ctx.fetch("grant delete on not_granted_mv_name_3 to new_test_user;");
400+
ctx.fetch("grant delete on not_granted_view_name_3 to new_test_user;");
401+
402+
ctx.fetch("grant select on test_table_granted_directly to new_test_user;");
403+
ctx.fetch("grant select on test_mv_granted_directly to new_test_user;");
404+
ctx.fetch("grant select on test_view_granted_directly to new_test_user;");
405+
406+
ctx.fetch("grant select on test_table_granted_directly_with_options to test_role with grant option;");
407+
ctx.fetch("grant select on test_mv_granted_directly_with_options to test_role with grant option;");
408+
ctx.fetch("grant select on test_view_granted_directly_with_options to test_role with grant option;");
409+
return null;
410+
});
411+
}
412+
413+
AirbyteCatalog actual = new PostgresSource().discover(getConfig(db, "new_test_user", "new_pass"));
414+
Set<String> tableNames = actual.getStreams().stream().map(stream -> stream.getName()).collect(Collectors.toSet());
415+
Set<String> expectedVisibleNames = Sets.newHashSet(
416+
"table_granted_by_role",
417+
"table_granted_by_role_with_options",
418+
"test_table_granted_directly",
419+
"test_table_granted_directly_with_options",
420+
"mv_granted_by_role",
421+
"mv_granted_by_role_with_options",
422+
"test_mv_granted_directly",
423+
"test_mv_granted_directly_with_options",
424+
"test_view_granted_directly",
425+
"test_view_granted_directly_with_options",
426+
"view_granted_by_role",
427+
"view_granted_by_role_with_options");
428+
429+
assertEquals(tableNames, expectedVisibleNames);
430+
431+
db.stop();
432+
}
433+
}
434+
340435
@Test
341436
void testReadSuccess() throws Exception {
342437
final ConfiguredAirbyteCatalog configuredCatalog =

0 commit comments

Comments
 (0)