Skip to content

Source postgres: fix schema permission issue #19024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Dec 2, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase
FROM pg_class c
JOIN pg_namespace n on c.relnamespace = n.oid
WHERE has_table_privilege(c.oid, 'SELECT')
AND has_schema_privilege(current_user, nspname, 'USAGE')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

-- r = ordinary table, i = index, S = sequence, t = TOAST table, v = view, m = materialized view, c = composite type, f = foreign table, p = partitioned table, I = partitioned index
AND relkind in ('r', 'm', 'v', 't', 'f', 'p')
and ((? is null) OR nspname = ?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -16,6 +19,7 @@
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -24,39 +28,36 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;

public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview";
public static final String LIMIT_PERMISSION_SCHEMA = "limit_perm_schema";
public static final String LIMIT_PERMISSION_ROLE = "limit_perm_role";
public static final String LIMIT_PERMISSION_ROLE_PASSWORD = "test";

private PostgreSQLContainer<?> container;
private JsonNode config;
private Database database;
private ConfiguredAirbyteCatalog configCatalog;

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
container = new PostgreSQLContainer<>("postgres:13-alpine");
container.start();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "Standard")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(container))
.put(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(container))
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.SCHEMAS_KEY, Jsons.jsonNode(List.of("public")))
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_KEY, false)
.put("replication_method", replicationMethod)
.build());

String username = container.getUsername();
String password = container.getPassword();
List<String> schemas = List.of("public");
config = getConfig(username, password, schemas);
try (final DSLContext dslContext = DSLContextFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
config.get(JdbcUtils.PASSWORD_KEY).asText(),
Expand All @@ -66,7 +67,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
container.getFirstMappedPort(),
config.get(JdbcUtils.DATABASE_KEY).asText()),
SQLDialect.POSTGRES)) {
final Database database = new Database(dslContext);
database = new Database(dslContext);

database.query(ctx -> {
ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
Expand All @@ -76,9 +77,26 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
ctx.fetch("CREATE MATERIALIZED VIEW testview AS select * from id_and_name where id = '2';");
return null;
});
configCatalog = getCommonConfigCatalog();
}
}

private JsonNode getConfig(String username, String password, List<String> schemas) {
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "Standard")
.build());
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(container))
.put(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(container))
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.SCHEMAS_KEY, Jsons.jsonNode(schemas))
.put(JdbcUtils.USERNAME_KEY, username)
.put(JdbcUtils.PASSWORD_KEY, password)
.put(JdbcUtils.SSL_KEY, false)
.put("replication_method", replicationMethod)
.build());
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
Expand All @@ -101,6 +119,59 @@ protected JsonNode getConfig() {

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return configCatalog;
}

@Override
protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}

@Test
public void testFullRefreshWithRevokingSchemaPermissions() throws Exception {
prepareEnvForUserWithoutPermissions(database);

config = getConfig(LIMIT_PERMISSION_ROLE, LIMIT_PERMISSION_ROLE_PASSWORD, List.of(LIMIT_PERMISSION_SCHEMA));
final ConfiguredAirbyteCatalog configuredCatalog = getLimitPermissionConfiguredCatalog();

final List<AirbyteRecordMessage> fullRefreshRecords = filterRecords(runRead(configuredCatalog));
final String assertionMessage = "Expected records after full refresh sync for user with schema permission";
assertFalse(fullRefreshRecords.isEmpty(), assertionMessage);

revokeSchemaPermissions(database);

final List<AirbyteRecordMessage> lessPermFullRefreshRecords = filterRecords(runRead(configuredCatalog));
final String assertionMessageWithoutPermission = "Expected no records after full refresh sync for user without schema permission";
assertTrue(lessPermFullRefreshRecords.isEmpty(), assertionMessageWithoutPermission);

}

private void revokeSchemaPermissions(Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch(String.format("REVOKE USAGE ON schema %s FROM %s;", LIMIT_PERMISSION_SCHEMA, LIMIT_PERMISSION_ROLE));
return null;
});
}

private void prepareEnvForUserWithoutPermissions(Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch(String.format("CREATE ROLE %s WITH LOGIN PASSWORD '%s';", LIMIT_PERMISSION_ROLE, LIMIT_PERMISSION_ROLE_PASSWORD));
ctx.fetch(String.format("CREATE SCHEMA %s;", LIMIT_PERMISSION_SCHEMA));
ctx.fetch(String.format("GRANT CONNECT ON DATABASE test TO %s;", LIMIT_PERMISSION_ROLE));
ctx.fetch(String.format("GRANT USAGE ON schema %s TO %s;", LIMIT_PERMISSION_SCHEMA, LIMIT_PERMISSION_ROLE));
ctx.fetch(String.format("CREATE TABLE %s.id_and_name(id INTEGER, name VARCHAR(200));", LIMIT_PERMISSION_SCHEMA));
ctx.fetch(String.format("INSERT INTO %s.id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');", LIMIT_PERMISSION_SCHEMA));
ctx.fetch(String.format("GRANT SELECT ON table %s.id_and_name TO %s;", LIMIT_PERMISSION_SCHEMA, LIMIT_PERMISSION_ROLE));
return null;
});
}

private ConfiguredAirbyteCatalog getCommonConfigCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
Expand Down Expand Up @@ -131,14 +202,17 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}

@Override
protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
private ConfiguredAirbyteCatalog getLimitPermissionConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
LIMIT_PERMISSION_SCHEMA + "." + "id_and_name",
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}

}