4
4
5
5
package io .airbyte .integrations .io .airbyte .integration_tests .sources ;
6
6
7
+ import static org .junit .jupiter .api .Assertions .assertFalse ;
8
+ import static org .junit .jupiter .api .Assertions .assertTrue ;
9
+
7
10
import com .fasterxml .jackson .databind .JsonNode ;
8
11
import com .google .common .collect .ImmutableMap ;
9
12
import com .google .common .collect .Lists ;
16
19
import io .airbyte .integrations .standardtest .source .SourceAcceptanceTest ;
17
20
import io .airbyte .integrations .standardtest .source .TestDestinationEnv ;
18
21
import io .airbyte .integrations .util .HostPortResolver ;
22
+ import io .airbyte .protocol .models .AirbyteCatalog ;
23
+ import io .airbyte .protocol .models .AirbyteRecordMessage ;
19
24
import io .airbyte .protocol .models .CatalogHelpers ;
20
25
import io .airbyte .protocol .models .ConfiguredAirbyteCatalog ;
21
26
import io .airbyte .protocol .models .ConfiguredAirbyteStream ;
24
29
import io .airbyte .protocol .models .Field ;
25
30
import io .airbyte .protocol .models .JsonSchemaType ;
26
31
import io .airbyte .protocol .models .SyncMode ;
32
+ import java .sql .SQLException ;
27
33
import java .util .HashMap ;
28
34
import java .util .List ;
29
35
import org .jooq .DSLContext ;
30
36
import org .jooq .SQLDialect ;
37
+ import org .junit .jupiter .api .Test ;
31
38
import org .testcontainers .containers .PostgreSQLContainer ;
32
39
33
40
public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest {
34
41
35
42
private static final String STREAM_NAME = "public.id_and_name" ;
36
43
private static final String STREAM_NAME2 = "public.starships" ;
37
44
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview" ;
45
+ public static final String LIMIT_PERMISSION_SCHEMA = "limit_perm_schema" ;
46
+ public static final String LIMIT_PERMISSION_ROLE = "limit_perm_role" ;
47
+ public static final String LIMIT_PERMISSION_ROLE_PASSWORD = "test" ;
38
48
39
49
private PostgreSQLContainer <?> container ;
40
50
private JsonNode config ;
51
+ private Database database ;
52
+ private ConfiguredAirbyteCatalog configCatalog ;
41
53
42
54
@ Override
43
55
protected void setupEnvironment (final TestDestinationEnv environment ) throws Exception {
44
56
container = new PostgreSQLContainer <>("postgres:13-alpine" );
45
57
container .start ();
46
- final JsonNode replicationMethod = Jsons .jsonNode (ImmutableMap .builder ()
47
- .put ("method" , "Standard" )
48
- .build ());
49
- config = Jsons .jsonNode (ImmutableMap .builder ()
50
- .put (JdbcUtils .HOST_KEY , HostPortResolver .resolveHost (container ))
51
- .put (JdbcUtils .PORT_KEY , HostPortResolver .resolvePort (container ))
52
- .put (JdbcUtils .DATABASE_KEY , container .getDatabaseName ())
53
- .put (JdbcUtils .SCHEMAS_KEY , Jsons .jsonNode (List .of ("public" )))
54
- .put (JdbcUtils .USERNAME_KEY , container .getUsername ())
55
- .put (JdbcUtils .PASSWORD_KEY , container .getPassword ())
56
- .put (JdbcUtils .SSL_KEY , false )
57
- .put ("replication_method" , replicationMethod )
58
- .build ());
59
-
58
+ String username = container .getUsername ();
59
+ String password = container .getPassword ();
60
+ List <String > schemas = List .of ("public" );
61
+ config = getConfig (username , password , schemas );
60
62
try (final DSLContext dslContext = DSLContextFactory .create (
61
63
config .get (JdbcUtils .USERNAME_KEY ).asText (),
62
64
config .get (JdbcUtils .PASSWORD_KEY ).asText (),
@@ -66,7 +68,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
66
68
container .getFirstMappedPort (),
67
69
config .get (JdbcUtils .DATABASE_KEY ).asText ()),
68
70
SQLDialect .POSTGRES )) {
69
- final Database database = new Database (dslContext );
71
+ database = new Database (dslContext );
70
72
71
73
database .query (ctx -> {
72
74
ctx .fetch ("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));" );
@@ -76,9 +78,26 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
76
78
ctx .fetch ("CREATE MATERIALIZED VIEW testview AS select * from id_and_name where id = '2';" );
77
79
return null ;
78
80
});
81
+ configCatalog = getCommonConfigCatalog ();
79
82
}
80
83
}
81
84
85
+ private JsonNode getConfig (String username , String password , List <String > schemas ) {
86
+ final JsonNode replicationMethod = Jsons .jsonNode (ImmutableMap .builder ()
87
+ .put ("method" , "Standard" )
88
+ .build ());
89
+ return Jsons .jsonNode (ImmutableMap .builder ()
90
+ .put (JdbcUtils .HOST_KEY , HostPortResolver .resolveHost (container ))
91
+ .put (JdbcUtils .PORT_KEY , HostPortResolver .resolvePort (container ))
92
+ .put (JdbcUtils .DATABASE_KEY , container .getDatabaseName ())
93
+ .put (JdbcUtils .SCHEMAS_KEY , Jsons .jsonNode (schemas ))
94
+ .put (JdbcUtils .USERNAME_KEY , username )
95
+ .put (JdbcUtils .PASSWORD_KEY , password )
96
+ .put (JdbcUtils .SSL_KEY , false )
97
+ .put ("replication_method" , replicationMethod )
98
+ .build ());
99
+ }
100
+
82
101
@ Override
83
102
protected void tearDown (final TestDestinationEnv testEnv ) {
84
103
container .close ();
@@ -101,6 +120,71 @@ protected JsonNode getConfig() {
101
120
102
121
@ Override
103
122
protected ConfiguredAirbyteCatalog getConfiguredCatalog () {
123
+ return configCatalog ;
124
+ }
125
+
126
+ @ Override
127
+ protected JsonNode getState () {
128
+ return Jsons .jsonNode (new HashMap <>());
129
+ }
130
+
131
+ @ Override
132
+ protected boolean supportsPerStream () {
133
+ return true ;
134
+ }
135
+
136
+ @ Test
137
+ public void testFullRefreshWithRevokingSchemaPermissions () throws Exception {
138
+ prepareEnvForUserWithoutPermissions (database );
139
+
140
+ config = getConfig (LIMIT_PERMISSION_ROLE , LIMIT_PERMISSION_ROLE_PASSWORD , List .of (LIMIT_PERMISSION_SCHEMA ));
141
+ final ConfiguredAirbyteCatalog configuredCatalog = getLimitPermissionConfiguredCatalog ();
142
+
143
+ final List <AirbyteRecordMessage > fullRefreshRecords = filterRecords (runRead (configuredCatalog ));
144
+ final String assertionMessage = "Expected records after full refresh sync for user with schema permission" ;
145
+ assertFalse (fullRefreshRecords .isEmpty (), assertionMessage );
146
+
147
+ revokeSchemaPermissions (database );
148
+
149
+ final List <AirbyteRecordMessage > lessPermFullRefreshRecords = filterRecords (runRead (configuredCatalog ));
150
+ final String assertionMessageWithoutPermission = "Expected no records after full refresh sync for user without schema permission" ;
151
+ assertTrue (lessPermFullRefreshRecords .isEmpty (), assertionMessageWithoutPermission );
152
+
153
+ }
154
+
155
+ @ Test
156
+ public void testDiscoverWithRevokingSchemaPermissions () throws Exception {
157
+ prepareEnvForUserWithoutPermissions (database );
158
+ revokeSchemaPermissions (database );
159
+ config = getConfig (LIMIT_PERMISSION_ROLE , LIMIT_PERMISSION_ROLE_PASSWORD , List .of (LIMIT_PERMISSION_SCHEMA ));
160
+
161
+ runDiscover ();
162
+ AirbyteCatalog lastPersistedCatalogSecond = getLastPersistedCatalog ();
163
+ final String assertionMessageWithoutPermission = "Expected no streams after discover for user without schema permissions" ;
164
+ assertTrue (lastPersistedCatalogSecond .getStreams ().isEmpty (), assertionMessageWithoutPermission );
165
+ }
166
+
167
+ private void revokeSchemaPermissions (Database database ) throws SQLException {
168
+ database .query (ctx -> {
169
+ ctx .fetch (String .format ("REVOKE USAGE ON schema %s FROM %s;" , LIMIT_PERMISSION_SCHEMA , LIMIT_PERMISSION_ROLE ));
170
+ return null ;
171
+ });
172
+ }
173
+
174
+ private void prepareEnvForUserWithoutPermissions (Database database ) throws SQLException {
175
+ database .query (ctx -> {
176
+ ctx .fetch (String .format ("CREATE ROLE %s WITH LOGIN PASSWORD '%s';" , LIMIT_PERMISSION_ROLE , LIMIT_PERMISSION_ROLE_PASSWORD ));
177
+ ctx .fetch (String .format ("CREATE SCHEMA %s;" , LIMIT_PERMISSION_SCHEMA ));
178
+ ctx .fetch (String .format ("GRANT CONNECT ON DATABASE test TO %s;" , LIMIT_PERMISSION_ROLE ));
179
+ ctx .fetch (String .format ("GRANT USAGE ON schema %s TO %s;" , LIMIT_PERMISSION_SCHEMA , LIMIT_PERMISSION_ROLE ));
180
+ ctx .fetch (String .format ("CREATE TABLE %s.id_and_name(id INTEGER, name VARCHAR(200));" , LIMIT_PERMISSION_SCHEMA ));
181
+ ctx .fetch (String .format ("INSERT INTO %s.id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');" , LIMIT_PERMISSION_SCHEMA ));
182
+ ctx .fetch (String .format ("GRANT SELECT ON table %s.id_and_name TO %s;" , LIMIT_PERMISSION_SCHEMA , LIMIT_PERMISSION_ROLE ));
183
+ return null ;
184
+ });
185
+ }
186
+
187
+ private ConfiguredAirbyteCatalog getCommonConfigCatalog () {
104
188
return new ConfiguredAirbyteCatalog ().withStreams (Lists .newArrayList (
105
189
new ConfiguredAirbyteStream ()
106
190
.withSyncMode (SyncMode .INCREMENTAL )
@@ -131,14 +215,17 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
131
215
.withSupportedSyncModes (Lists .newArrayList (SyncMode .FULL_REFRESH , SyncMode .INCREMENTAL )))));
132
216
}
133
217
134
- @ Override
135
- protected JsonNode getState () {
136
- return Jsons .jsonNode (new HashMap <>());
137
- }
138
-
139
- @ Override
140
- protected boolean supportsPerStream () {
141
- return true ;
218
+ private ConfiguredAirbyteCatalog getLimitPermissionConfiguredCatalog () {
219
+ return new ConfiguredAirbyteCatalog ().withStreams (Lists .newArrayList (
220
+ new ConfiguredAirbyteStream ()
221
+ .withSyncMode (SyncMode .INCREMENTAL )
222
+ .withCursorField (Lists .newArrayList ("id" ))
223
+ .withDestinationSyncMode (DestinationSyncMode .APPEND )
224
+ .withStream (CatalogHelpers .createAirbyteStream (
225
+ LIMIT_PERMISSION_SCHEMA + "." + "id_and_name" ,
226
+ Field .of ("id" , JsonSchemaType .NUMBER ),
227
+ Field .of ("name" , JsonSchemaType .STRING ))
228
+ .withSupportedSyncModes (Lists .newArrayList (SyncMode .FULL_REFRESH , SyncMode .INCREMENTAL )))));
142
229
}
143
230
144
231
}
0 commit comments