17
17
import static org .assertj .core .api .Assertions .assertThat ;
18
18
import static org .assertj .core .api .Assertions .fail ;
19
19
import static org .junit .Assert .assertEquals ;
20
+ import static org .junit .Assert .assertThrows ;
20
21
21
22
import com .risingwave .proto .ConnectorServiceProto ;
22
- import io .grpc .Grpc ;
23
- import io .grpc .InsecureChannelCredentials ;
24
- import io .grpc .Server ;
25
- import io .grpc .ServerBuilder ;
23
+ import com .risingwave .proto .Data ;
24
+ import io .grpc .*;
26
25
import java .io .IOException ;
27
26
import java .sql .Connection ;
28
27
import java .sql .ResultSet ;
@@ -41,7 +40,7 @@ public class PostgresSourceTest {
41
40
private static final Logger LOG = LoggerFactory .getLogger (PostgresSourceTest .class .getName ());
42
41
43
42
private static final PostgreSQLContainer <?> pg =
44
- new PostgreSQLContainer <>("postgres:12.3 -alpine" )
43
+ new PostgreSQLContainer <>("postgres:15 -alpine" )
45
44
.withDatabaseName ("test" )
46
45
.withUsername ("postgres" )
47
46
.withCommand ("postgres -c wal_level=logical -c max_wal_senders=10" );
@@ -145,6 +144,69 @@ public void testLines() throws InterruptedException, SQLException {
145
144
connection .close ();
146
145
}
147
146
147
+ // test whether validation catches permission errors
148
+ @ Test
149
+ public void testPermissionCheck () {
150
+ Connection connection = SourceTestClient .connect (pgDataSource );
151
+ String query =
152
+ "CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))" ;
153
+ SourceTestClient .performQuery (connection , query );
154
+ // create a partial publication, check whether error is reported
155
+ query = "CREATE PUBLICATION dbz_publication FOR TABLE orders (o_key)" ;
156
+ SourceTestClient .performQuery (connection , query );
157
+ ConnectorServiceProto .TableSchema tableSchema =
158
+ ConnectorServiceProto .TableSchema .newBuilder ()
159
+ .addColumns (
160
+ ConnectorServiceProto .TableSchema .Column .newBuilder ()
161
+ .setName ("o_key" )
162
+ .setDataType (Data .DataType .TypeName .INT64 )
163
+ .build ())
164
+ .addColumns (
165
+ ConnectorServiceProto .TableSchema .Column .newBuilder ()
166
+ .setName ("o_val" )
167
+ .setDataType (Data .DataType .TypeName .INT32 )
168
+ .build ())
169
+ .addPkIndices (0 )
170
+ .build ();
171
+ Iterator <ConnectorServiceProto .GetEventStreamResponse > eventStream1 =
172
+ testClient .getEventStreamValidate (
173
+ pg ,
174
+ ConnectorServiceProto .SourceType .POSTGRES ,
175
+ tableSchema ,
176
+ "test" ,
177
+ "orders" );
178
+ StatusRuntimeException exception1 =
179
+ assertThrows (
180
+ StatusRuntimeException .class ,
181
+ () -> {
182
+ eventStream1 .hasNext ();
183
+ });
184
+ assertEquals (
185
+ exception1 .getMessage (),
186
+ "INVALID_ARGUMENT: INTERNAL: The publication 'dbz_publication' does not cover all necessary columns in table orders" );
187
+ query = "DROP PUBLICATION dbz_publication" ;
188
+ SourceTestClient .performQuery (connection , query );
189
+ // revoke superuser and replication, check if reports error
190
+ query = "ALTER USER " + pg .getUsername () + " nosuperuser noreplication" ;
191
+ SourceTestClient .performQuery (connection , query );
192
+ Iterator <ConnectorServiceProto .GetEventStreamResponse > eventStream2 =
193
+ testClient .getEventStreamValidate (
194
+ pg ,
195
+ ConnectorServiceProto .SourceType .POSTGRES ,
196
+ tableSchema ,
197
+ "test" ,
198
+ "orders" );
199
+ StatusRuntimeException exception2 =
200
+ assertThrows (
201
+ StatusRuntimeException .class ,
202
+ () -> {
203
+ eventStream2 .hasNext ();
204
+ });
205
+ assertEquals (
206
+ exception2 .getMessage (),
207
+ "INVALID_ARGUMENT: INTERNAL: Postgres user must be superuser or replication role to start walsender." );
208
+ }
209
+
148
210
// generates test cases for the risingwave debezium parser
149
211
@ Ignore
150
212
@ Test
0 commit comments