Skip to content

Commit c5f8802

Browse files
alafanechereetsybaev
authored andcommitted
🎉 Source redshift: implement privileges check (#9744)
1 parent e4f650b commit c5f8802

File tree

9 files changed

+87
-16
lines changed

9 files changed

+87
-16
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@
641641
- name: Redshift
642642
sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
643643
dockerRepository: airbyte/source-redshift
644-
dockerImageTag: 0.3.8
644+
dockerImageTag: 0.3.9
645645
documentationUrl: https://docs.airbyte.io/integrations/sources/redshift
646646
icon: redshift.svg
647647
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6631,9 +6631,9 @@
66316631
supportsNormalization: false
66326632
supportsDBT: false
66336633
supported_destination_sync_modes: []
6634-
- dockerImage: "airbyte/source-redshift:0.3.8"
6634+
- dockerImage: "airbyte/source-redshift:0.3.9"
66356635
spec:
6636-
documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift"
6636+
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
66376637
connectionSpecification:
66386638
$schema: "http://json-schema.org/draft-07/schema#"
66396639
title: "Redshift Source Spec"

airbyte-integrations/connectors/source-redshift/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-redshift
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.3.8
19+
LABEL io.airbyte.version=0.3.9
2020
LABEL io.airbyte.name=airbyte/source-redshift

airbyte-integrations/connectors/source-redshift/src/main/java/io/airbyte/integrations/source/redshift/RedshiftSource.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212
import io.airbyte.integrations.base.IntegrationRunner;
1313
import io.airbyte.integrations.base.Source;
1414
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
15+
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
1516
import io.airbyte.integrations.source.relationaldb.TableInfo;
1617
import io.airbyte.protocol.models.CommonField;
1718
import java.sql.JDBCType;
19+
import java.sql.PreparedStatement;
20+
import java.sql.SQLException;
1821
import java.util.ArrayList;
22+
import java.util.HashSet;
1923
import java.util.List;
2024
import java.util.Set;
2125
import org.slf4j.Logger;
@@ -93,6 +97,27 @@ public Set<String> getExcludedInternalNameSpaces() {
9397
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
9498
}
9599

100+
@Override
101+
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException {
102+
return new HashSet<>(database.bufferedResultSetQuery(
103+
connection -> {
104+
connection.setAutoCommit(true);
105+
final PreparedStatement ps = connection.prepareStatement(
106+
"SELECT schemaname, tablename "
107+
+ "FROM pg_tables "
108+
+ "WHERE has_table_privilege(schemaname||'.'||tablename, 'select') = true AND schemaname = ?;");
109+
ps.setString(1, schema);
110+
return ps.executeQuery();
111+
},
112+
resultSet -> {
113+
final JsonNode json = sourceOperations.rowToJson(resultSet);
114+
return JdbcPrivilegeDto.builder()
115+
.schemaName(json.get("schemaname").asText())
116+
.tableName(json.get("tablename").asText())
117+
.build();
118+
}));
119+
}
120+
96121
public static void main(final String[] args) throws Exception {
97122
final Source source = new RedshiftSource();
98123
LOGGER.info("starting source: {}", RedshiftSource.class);

airbyte-integrations/connectors/source-redshift/src/main/resources/spec.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/redshift",
2+
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
33
"connectionSpecification": {
44
"$schema": "http://json-schema.org/draft-07/schema#",
55
"title": "Redshift Source Spec",

airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftJdbcSourceAcceptanceTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ private static JsonNode getStaticConfig() {
2929
@BeforeEach
3030
public void setup() throws Exception {
3131
config = getStaticConfig();
32-
3332
super.setup();
3433
}
3534

airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftSourceAcceptanceTest.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {
4040
// This test case expects an active redshift cluster that is useable from outside of vpc
4141
protected ObjectNode config;
4242
protected JdbcDatabase database;
43+
protected String testUserName;
44+
protected String testUserPassword;
4345
protected String schemaName;
4446
protected String schemaToIgnore;
4547
protected String streamName;
@@ -53,22 +55,30 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
5355
config = getStaticConfig();
5456

5557
database = createDatabase(config);
56-
58+
testUserName = "foo";
59+
testUserPassword = "BarBarBar1&";
60+
createTestUser(database, config, testUserName, testUserPassword);
5761
schemaName = Strings.addRandomSuffix("integration_test", "_", 5).toLowerCase();
5862
schemaToIgnore = schemaName + "shouldIgnore";
63+
streamName = "customer";
5964

6065
// limit the connection to one schema only
6166
config = config.set("schemas", Jsons.jsonNode(List.of(schemaName)));
6267

68+
// use test user user
69+
config = config.set("username", Jsons.jsonNode(testUserName));
70+
config = config.set("password", Jsons.jsonNode(testUserPassword));
71+
6372
// create a test data
64-
createTestData(database, schemaName);
73+
createTestData(database, schemaName, streamName, testUserName, true);
74+
createTestData(database, schemaName, "not_readable", testUserName, false);
6575

6676
// create a schema with data that will not be used for testing, but would be used to check schema
6777
// filtering. This one should not be visible in results
68-
createTestData(database, schemaToIgnore);
78+
createTestData(database, schemaToIgnore, streamName, testUserName, true);
6979
}
7080

71-
protected static JdbcDatabase createDatabase(final JsonNode config) {
81+
protected JdbcDatabase createDatabase(final JsonNode config) {
7282
return Databases.createJdbcDatabase(
7383
config.get("username").asText(),
7484
config.get("password").asText(),
@@ -79,15 +89,30 @@ protected static JdbcDatabase createDatabase(final JsonNode config) {
7989
RedshiftSource.DRIVER_CLASS);
8090
}
8191

82-
protected void createTestData(final JdbcDatabase database, final String schemaName)
92+
protected void createTestUser(final JdbcDatabase database, final JsonNode config, final String testUserName, final String testUserPassword)
93+
throws SQLException {
94+
final String createTestUserQuery = String.format("CREATE USER %s PASSWORD '%s'", testUserName, testUserPassword);
95+
database.execute(connection -> {
96+
connection.createStatement().execute(createTestUserQuery);
97+
});
98+
final String grantSelectOnPgTablesQuery = String.format("GRANT SELECT ON TABLE pg_tables TO %s ", testUserName);
99+
database.execute(connection -> {
100+
connection.createStatement().execute(grantSelectOnPgTablesQuery);
101+
});
102+
}
103+
104+
protected void createTestData(final JdbcDatabase database,
105+
final String schemaName,
106+
final String tableName,
107+
final String testUserName,
108+
final Boolean isReadableByTestUser)
83109
throws SQLException {
84-
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
110+
final String createSchemaQuery = String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName);
85111
database.execute(connection -> {
86112
connection.createStatement().execute(createSchemaQuery);
87113
});
88114

89-
streamName = "customer";
90-
final String fqTableName = JdbcUtils.getFullyQualifiedTableName(schemaName, streamName);
115+
final String fqTableName = JdbcUtils.getFullyQualifiedTableName(schemaName, tableName);
91116
final String createTestTable =
92117
String.format(
93118
"CREATE TABLE IF NOT EXISTS %s (c_custkey INTEGER, c_name VARCHAR(16), c_nation VARCHAR(16));\n",
@@ -101,6 +126,22 @@ protected void createTestData(final JdbcDatabase database, final String schemaNa
101126
database.execute(connection -> {
102127
connection.createStatement().execute(insertTestData);
103128
});
129+
130+
if (!isReadableByTestUser) {
131+
final String revokeSelect = String.format("REVOKE SELECT ON TABLE %s FROM %s;\n", fqTableName, testUserName);
132+
database.execute(connection -> {
133+
connection.createStatement().execute(revokeSelect);
134+
});
135+
} else {
136+
final String grantUsageQuery = String.format("GRANT USAGE ON SCHEMA %s TO %s;\n", schemaName, testUserName);
137+
database.execute(connection -> {
138+
connection.createStatement().execute(grantUsageQuery);
139+
});
140+
final String grantSelectQuery = String.format("GRANT SELECT ON TABLE %s TO %s;\n", fqTableName, testUserName);
141+
database.execute(connection -> {
142+
connection.createStatement().execute(grantSelectQuery);
143+
});
144+
}
104145
}
105146

106147
@Override
@@ -109,6 +150,10 @@ protected void tearDown(final TestDestinationEnv testEnv) throws SQLException {
109150
.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schemaName)));
110151
database.execute(connection -> connection.createStatement()
111152
.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schemaToIgnore)));
153+
database.execute(connection -> connection.createStatement()
154+
.execute(String.format("REVOKE SELECT ON table pg_tables FROM %s", testUserName)));
155+
database.execute(connection -> connection.createStatement()
156+
.execute(String.format("DROP USER IF EXISTS %s", testUserName)));
112157
}
113158

114159
@Override

airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftSslSourceAcceptanceTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111

1212
public class RedshiftSslSourceAcceptanceTest extends RedshiftSourceAcceptanceTest {
1313

14-
protected static JdbcDatabase createDatabase(final JsonNode config) {
14+
@Override
15+
protected JdbcDatabase createDatabase(final JsonNode config) {
1516
return Databases.createJdbcDatabase(
1617
config.get("username").asText(),
1718
config.get("password").asText(),

docs/integrations/sources/redshift.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ All Redshift connections are encrypted using SSL
5454

5555
| Version | Date | Pull Request | Subject |
5656
| :------ | :-------- | :----- | :------ |
57-
| 0.3 .8 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
57+
| 0.3.9 | 2022-02-21 | [9744](https://github.com/airbytehq/airbyte/pull/9744) | List only the tables on which the user has SELECT permissions.
58+
| 0.3.8 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
5859
| 0.3.7 | 2022-01-26 | [9721](https://github.com/airbytehq/airbyte/pull/9721) | Added schema selection |
5960
| 0.3.6 | 2022-01-20 | [8617](https://github.com/airbytehq/airbyte/pull/8617) | Update connector fields title/description |
6061
| 0.3.5 | 2021-12-24 | [8958](https://github.com/airbytehq/airbyte/pull/8958) | Add support for JdbcType.ARRAY |

0 commit comments

Comments
 (0)