Skip to content

Commit 6210046

Browse files
committed
try-with-resources for iceberg code
1 parent 729618f commit 6210046

File tree

1 file changed

+29
-29
lines changed

1 file changed

+29
-29
lines changed

java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -117,42 +117,42 @@ public void validate(
117117

118118
TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);
119119
Configuration hadoopConf = createHadoopConf(schema, tableProperties);
120-
HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath);
121-
Table icebergTable;
122-
try {
123-
icebergTable = hadoopCatalog.loadTable(tableIdentifier);
124-
hadoopCatalog.close();
120+
121+
try (HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); ) {
122+
123+
Table icebergTable = hadoopCatalog.loadTable(tableIdentifier);
124+
125+
// Check that all columns in tableSchema exist in the iceberg table.
126+
for (String columnName : tableSchema.getColumnNames()) {
127+
if (icebergTable.schema().findField(columnName) == null) {
128+
throw Status.FAILED_PRECONDITION
129+
.withDescription(
130+
String.format(
131+
"table schema does not match. Column %s not found in iceberg table",
132+
columnName))
133+
.asRuntimeException();
134+
}
135+
}
136+
137+
// Check that all required columns in the iceberg table exist in tableSchema.
138+
Set<String> columnNames = Set.of(tableSchema.getColumnNames());
139+
for (Types.NestedField column : icebergTable.schema().columns()) {
140+
if (column.isRequired() && !columnNames.contains(column.name())) {
141+
throw Status.FAILED_PRECONDITION
142+
.withDescription(
143+
String.format("missing a required field %s", column.name()))
144+
.asRuntimeException();
145+
}
146+
}
147+
125148
} catch (Exception e) {
126-
throw Status.FAILED_PRECONDITION
149+
throw Status.INTERNAL
127150
.withDescription(
128151
String.format("failed to load iceberg table: %s", e.getMessage()))
129152
.withCause(e)
130153
.asRuntimeException();
131154
}
132155

133-
// Check that all columns in tableSchema exist in the iceberg table.
134-
for (String columnName : tableSchema.getColumnNames()) {
135-
if (icebergTable.schema().findField(columnName) == null) {
136-
throw Status.FAILED_PRECONDITION
137-
.withDescription(
138-
String.format(
139-
"table schema does not match. Column %s not found in iceberg table",
140-
columnName))
141-
.asRuntimeException();
142-
}
143-
}
144-
145-
// Check that all required columns in the iceberg table exist in tableSchema.
146-
Set<String> columnNames = Set.of(tableSchema.getColumnNames());
147-
for (Types.NestedField column : icebergTable.schema().columns()) {
148-
if (column.isRequired() && !columnNames.contains(column.name())) {
149-
throw Status.FAILED_PRECONDITION
150-
.withDescription(
151-
String.format("missing a required field %s", column.name()))
152-
.asRuntimeException();
153-
}
154-
}
155-
156156
if (!mode.equals("append-only") && !mode.equals("upsert")) {
157157
throw UNIMPLEMENTED.withDescription("unsupported mode: " + mode).asRuntimeException();
158158
}

0 commit comments

Comments
 (0)