Skip to content

Commit 32f4925

Browse files
authored
fix(connector): add postgres permission checks in validation phase (risingwavelabs#8525)
1 parent ce9e519 commit 32f4925

File tree

3 files changed

+190
-0
lines changed

3 files changed

+190
-0
lines changed

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,159 @@ private void validateDbProperties(
263263
}
264264
}
265265
}
266+
// check whether user is superuser or replication role
267+
try (var stmt =
268+
conn.prepareStatement(sqlStmts.getProperty("postgres.role.check"))) {
269+
stmt.setString(1, props.get(DbzConnectorConfig.USER));
270+
var res = stmt.executeQuery();
271+
while (res.next()) {
272+
if (!res.getBoolean(1)) {
273+
throw new StatusException(
274+
Status.INTERNAL.withDescription(
275+
"Postgres user must be superuser or replication role to start walsender."));
276+
}
277+
}
278+
}
279+
// check whether user has select privilege on table for initial snapshot
280+
try (var stmt =
281+
conn.prepareStatement(
282+
sqlStmts.getProperty("postgres.table_privilege.check"))) {
283+
stmt.setString(1, props.get(DbzConnectorConfig.TABLE_NAME));
284+
stmt.setString(2, props.get(DbzConnectorConfig.USER));
285+
var res = stmt.executeQuery();
286+
while (res.next()) {
287+
if (!res.getBoolean(1)) {
288+
throw new StatusException(
289+
Status.INTERNAL.withDescription(
290+
"Postgres user must have select privilege on table "
291+
+ props.get(
292+
DbzConnectorConfig.TABLE_NAME)));
293+
}
294+
}
295+
}
296+
// check whether publication exists
297+
boolean publicationExists = false;
298+
boolean partialPublication = false;
299+
try (var stmt = conn.createStatement()) {
300+
var res =
301+
stmt.executeQuery(
302+
sqlStmts.getProperty("postgres.publication_att_exists"));
303+
while (res.next()) {
304+
partialPublication = res.getBoolean(1);
305+
}
306+
}
307+
// pg 15 and up supports partial publication of table
308+
// check whether publication covers all columns
309+
if (partialPublication) {
310+
try (var stmt =
311+
conn.prepareStatement(
312+
sqlStmts.getProperty("postgres.publication_att"))) {
313+
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
314+
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
315+
var res = stmt.executeQuery();
316+
while (res.next()) {
317+
String[] columnsPub =
318+
(String[]) res.getArray("attnames").getArray();
319+
var sourceSchema = validate.getTableSchema();
320+
for (int i = 0; i < sourceSchema.getColumnsCount(); i++) {
321+
String columnName = sourceSchema.getColumns(i).getName();
322+
if (Arrays.stream(columnsPub).noneMatch(columnName::equals)) {
323+
throw new StatusException(
324+
Status.INTERNAL.withDescription(
325+
"The publication 'dbz_publication' does not cover all necessary columns in table "
326+
+ props.get(
327+
DbzConnectorConfig
328+
.TABLE_NAME)));
329+
}
330+
if (i == sourceSchema.getColumnsCount() - 1) {
331+
publicationExists = true;
332+
}
333+
}
334+
if (publicationExists) {
335+
LOG.info("publication exists");
336+
break;
337+
}
338+
}
339+
}
340+
} else { // check directly whether publication exists
341+
try (var stmt =
342+
conn.prepareStatement(
343+
sqlStmts.getProperty("postgres.publication_cnt"))) {
344+
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
345+
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
346+
var res = stmt.executeQuery();
347+
while (res.next()) {
348+
if (res.getInt("count") > 0) {
349+
publicationExists = true;
350+
LOG.info("publication exists");
351+
break;
352+
}
353+
}
354+
}
355+
}
356+
// if publication does not exist, check permission to create publication
357+
if (!publicationExists) {
358+
// check create privilege on database
359+
try (var stmt =
360+
conn.prepareStatement(
361+
sqlStmts.getProperty(
362+
"postgres.database_privilege.check"))) {
363+
stmt.setString(1, props.get(DbzConnectorConfig.USER));
364+
stmt.setString(2, props.get(DbzConnectorConfig.DB_NAME));
365+
stmt.setString(3, props.get(DbzConnectorConfig.USER));
366+
var res = stmt.executeQuery();
367+
while (res.next()) {
368+
if (!res.getBoolean(1)) {
369+
throw new StatusException(
370+
Status.INTERNAL.withDescription(
371+
"Postgres user must have create privilege on database"
372+
+ props.get(
373+
DbzConnectorConfig.DB_NAME)));
374+
}
375+
}
376+
}
377+
// check ownership on table
378+
boolean isTableOwner = false;
379+
String owner = null;
380+
// check if user is owner
381+
try (var stmt =
382+
conn.prepareStatement(
383+
sqlStmts.getProperty("postgres.table_owner"))) {
384+
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
385+
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
386+
var res = stmt.executeQuery();
387+
while (res.next()) {
388+
owner = res.getString("tableowner");
389+
if (owner.equals(props.get(DbzConnectorConfig.USER))) {
390+
isTableOwner = true;
391+
break;
392+
}
393+
}
394+
}
395+
// if user is not owner, check if user belongs to owner group
396+
if (!isTableOwner && !owner.isEmpty()) {
397+
try (var stmt =
398+
conn.prepareStatement(
399+
sqlStmts.getProperty("postgres.users_of_group"))) {
400+
stmt.setString(1, owner);
401+
var res = stmt.executeQuery();
402+
while (res.next()) {
403+
String[] users = (String[]) res.getArray("members").getArray();
404+
if (Arrays.stream(users)
405+
.anyMatch(props.get(DbzConnectorConfig.USER)::equals)) {
406+
isTableOwner = true;
407+
break;
408+
}
409+
}
410+
}
411+
}
412+
if (!isTableOwner) {
413+
throw new StatusException(
414+
Status.INTERNAL.withDescription(
415+
"Postgres user must be owner of table "
416+
+ props.get(DbzConnectorConfig.TABLE_NAME)));
417+
}
418+
}
266419
break;
267420
default:
268421
break;

java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,28 @@ postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND ta
88
postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary
99
postgres.table_schema=SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ? ORDER BY ordinal_position
1010
postgres.slot.check=SELECT slot_name FROM pg_replication_slots WHERE slot_name = ?
11+
postgres.role.check=SELECT rolreplication OR rolsuper FROM pg_roles WHERE rolname = ?
12+
postgres.database_privilege.check=SELECT has_database_privilege(?, ?, 'create') FROM pg_roles WHERE rolname = ?
13+
postgres.table_privilege.check=SELECT (COUNT(*) = 1) FROM information_schema.role_table_grants WHERE table_name = ? AND grantee = ? and privilege_type = 'SELECT'
14+
postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ?
15+
postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames'
16+
postgres.publication_att=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = 'dbz_publication'
17+
postgres.publication_cnt=SELECT COUNT(*) AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = 'dbz_publication'
18+
postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \
19+
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
20+
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
21+
INNER JOIN pg_roles r2 ON r2.oid = am.member \
22+
WHERE r1.rolname = ? \
23+
GROUP BY r1.rolname \
24+
) \
25+
UNION ALL ( \
26+
WITH groups AS (SELECT DISTINCT(UNNEST(m)) AS g FROM base) \
27+
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
28+
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
29+
INNER JOIN pg_roles r2 ON r2.oid = am.member \
30+
INNER JOIN groups ON r1.rolname = groups.g \
31+
GROUP BY r1.rolname \
32+
) \
33+
), \
34+
tmp AS (SELECT DISTINCT(UNNEST(m)) AS members FROM base) \
35+
SELECT ARRAY_AGG(members) AS members FROM tmp

java/connector-node/risingwave-source-test/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,18 @@
1717
</properties>
1818

1919
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.logging.log4j</groupId>
22+
<artifactId>log4j-api</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.apache.logging.log4j</groupId>
26+
<artifactId>log4j-slf4j-impl</artifactId>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.apache.logging.log4j</groupId>
30+
<artifactId>log4j-core</artifactId>
31+
</dependency>
2032
<dependency>
2133
<groupId>junit</groupId>
2234
<artifactId>junit</artifactId>

0 commit comments

Comments
 (0)