Skip to content

Commit 1f4198b

Browse files
Added pass through view support
1 parent 9467f6a commit 1f4198b

File tree

3 files changed

+102
-4
lines changed

3 files changed

+102
-4
lines changed

spark-bigquery-dsv2/spark-3.5-bigquery-lib/src/main/java/com/google/cloud/spark/bigquery/BigQueryCatalogExtension.java

+72-1
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
2323
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
2424
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
25+
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
2526
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
2627
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
28+
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
2729
import org.apache.spark.sql.connector.catalog.CatalogExtension;
2830
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
2931
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
@@ -33,6 +35,9 @@
3335
import org.apache.spark.sql.connector.catalog.Table;
3436
import org.apache.spark.sql.connector.catalog.TableCatalog;
3537
import org.apache.spark.sql.connector.catalog.TableChange;
38+
import org.apache.spark.sql.connector.catalog.View;
39+
import org.apache.spark.sql.connector.catalog.ViewCatalog;
40+
import org.apache.spark.sql.connector.catalog.ViewChange;
3641
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
3742
import org.apache.spark.sql.connector.expressions.Transform;
3843
import org.apache.spark.sql.internal.SQLConf;
@@ -41,7 +46,7 @@
4146
import org.slf4j.Logger;
4247
import org.slf4j.LoggerFactory;
4348

44-
public class BigQueryCatalogExtension implements CatalogExtension {
49+
public class BigQueryCatalogExtension implements CatalogExtension, ViewCatalog {
4550

4651
private static final Logger logger = LoggerFactory.getLogger(BigQueryCatalogExtension.class);
4752
private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};
@@ -166,6 +171,68 @@ public boolean dropTable(Identifier ident) {
166171
public void renameTable(Identifier oldIdent, Identifier newIdent)
167172
throws NoSuchTableException, TableAlreadyExistsException {}
168173

174+
@Override
175+
public Identifier[] listViews(String... namespace) throws NoSuchNamespaceException {
176+
return delegateAsViewCatalog().listViews(namespace);
177+
}
178+
179+
@Override
180+
public View loadView(Identifier ident) throws NoSuchViewException {
181+
return delegateAsViewCatalog().loadView(ident);
182+
}
183+
184+
@Override
185+
public void invalidateView(Identifier ident) {
186+
delegateAsViewCatalog().invalidateView(ident);
187+
}
188+
189+
@Override
190+
public boolean viewExists(Identifier ident) {
191+
return delegateAsViewCatalog().viewExists(ident);
192+
}
193+
194+
@Override
195+
public View createView(
196+
Identifier ident,
197+
String sql,
198+
String currentCatalog,
199+
String[] currentNamespace,
200+
StructType schema,
201+
String[] queryColumnNames,
202+
String[] columnAliases,
203+
String[] columnComments,
204+
Map<String, String> properties)
205+
throws ViewAlreadyExistsException, NoSuchNamespaceException {
206+
return delegateAsViewCatalog()
207+
.createView(
208+
ident,
209+
sql,
210+
currentCatalog,
211+
currentNamespace,
212+
schema,
213+
queryColumnNames,
214+
columnAliases,
215+
columnComments,
216+
properties);
217+
}
218+
219+
@Override
220+
public View alterView(Identifier ident, ViewChange... changes)
221+
throws NoSuchViewException, IllegalArgumentException {
222+
return delegateAsViewCatalog().alterView(ident, changes);
223+
}
224+
225+
@Override
226+
public boolean dropView(Identifier ident) {
227+
return delegateAsViewCatalog().dropView(ident);
228+
}
229+
230+
@Override
231+
public void renameView(Identifier oldIdent, Identifier newIdent)
232+
throws NoSuchViewException, ViewAlreadyExistsException {
233+
delegateAsViewCatalog().renameView(oldIdent, newIdent);
234+
}
235+
169236
private TableCatalog delegateAsTableCatalog() {
170237
return (TableCatalog) sessionCatalog;
171238
}
@@ -177,4 +244,8 @@ private FunctionCatalog delegateAsFunctionCatalog() {
177244
private SupportsNamespaces delegateAsSupportsNamespaces() {
178245
return (SupportsNamespaces) sessionCatalog;
179246
}
247+
248+
private ViewCatalog delegateAsViewCatalog() {
249+
return (ViewCatalog) sessionCatalog;
250+
}
180251
}

spark-bigquery-dsv2/spark-3.5-bigquery/src/test/java/com/google/cloud/spark/bigquery/integration/CatalogExtensionIntegrationTest.java

+27
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,37 @@
1515
*/
1616
package com.google.cloud.spark.bigquery.integration;
1717

18+
import static com.google.common.truth.Truth.assertThat;
19+
20+
import com.google.cloud.bigquery.Table;
21+
import com.google.cloud.bigquery.TableId;
22+
import java.util.List;
23+
import org.apache.spark.sql.Row;
1824
import org.apache.spark.sql.SparkSession;
25+
import org.junit.Test;
1926

2027
public class CatalogExtensionIntegrationTest extends CatalogIntegrationTestBase {
2128

29+
@Test
30+
public void testCreateGlobalTemporaryView() throws Exception {
31+
String dataset = testDataset.testDataset;
32+
String view = "test_view_" + System.nanoTime();
33+
34+
try (SparkSession spark = createSparkSession()) {
35+
// prepare the table
36+
spark.sql("CREATE TABLE " + fullTableName(dataset) + " AS SELECT 1 AS id, 'foo' AS data;");
37+
Table table = bigquery.getTable(TableId.of(dataset, testTable));
38+
assertThat(table).isNotNull();
39+
assertThat(selectCountStarFrom(dataset, testTable)).isEqualTo(1L);
40+
41+
spark.sql(
42+
String.format(
43+
"CREATE GLOBAL TEMPORARY VIEW %s AS select * from %s", view, fullTableName(dataset)));
44+
List<Row> result = spark.sql("SELECT * FROM global_temp." + view).collectAsList();
45+
assertThat(result).hasSize(1);
46+
}
47+
}
48+
2249
@Override
2350
protected SparkSession createSparkSession() {
2451
return SparkSession.builder()

spark-bigquery-dsv2/spark-3.5-bigquery/src/test/java/com/google/cloud/spark/bigquery/integration/CatalogIntegrationTestBase.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public abstract class CatalogIntegrationTestBase {
4040

4141
BigQuery bigquery = IntegrationTestUtils.getBigquery();
4242

43-
private String testTable;
43+
protected String testTable;
4444

4545
@Before
4646
public void renameTestTable() {
@@ -168,15 +168,15 @@ private void internalTestCreateTableWithExplicitTarget(String dataset)
168168
}
169169
}
170170

171-
private String fullTableName(String dataset) {
171+
protected String fullTableName(String dataset) {
172172
return dataset.equals(DEFAULT_NAMESPACE)
173173
? "`" + testTable + "`"
174174
: "`" + dataset + "`.`" + testTable + "`";
175175
}
176176

177177
// this is needed as with direct write the table's metadata can e updated only after few minutes.
178178
// Queries take pending data into account though.
179-
private long selectCountStarFrom(String dataset, String table) throws InterruptedException {
179+
protected long selectCountStarFrom(String dataset, String table) throws InterruptedException {
180180
return bigquery
181181
.query(
182182
QueryJobConfiguration.of(

0 commit comments

Comments
 (0)