Skip to content

Commit 980200a

Browse files
committed
Get lineage out of query
Signed-off-by: Dominik Dębowczyk <[email protected]>
1 parent 19cca59 commit 980200a

File tree

3 files changed

+57
-6
lines changed

3 files changed

+57
-6
lines changed

spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/OpenLineageIntegrationTestBase.java

+45
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import static com.google.common.truth.Truth.assertThat;
1919

2020
import java.io.File;
21+
import java.io.FileNotFoundException;
22+
import java.io.PrintWriter;
2123
import java.util.ArrayList;
2224
import java.util.List;
2325
import java.util.Scanner;
@@ -26,6 +28,7 @@
2628
import org.apache.spark.sql.SaveMode;
2729
import org.apache.spark.sql.SparkSession;
2830
import org.json.JSONObject;
31+
import org.junit.After;
2932
import org.junit.Before;
3033
import org.junit.ClassRule;
3134
import org.junit.Test;
@@ -50,6 +53,12 @@ public void createTestTable() {
5053
testTable = "test_" + System.nanoTime();
5154
}
5255

56+
@After
57+
public void clearLineageFile() throws FileNotFoundException {
58+
PrintWriter pw = new PrintWriter(lineageFile);
59+
pw.close();
60+
}
61+
5362
protected static class CustomSessionFactory extends ExternalResource {
5463
SparkSession spark;
5564
File lineageFile;
@@ -73,6 +82,9 @@ protected void before() throws Throwable {
7382
}
7483

7584
private List<JSONObject> parseEventLogs(File file) throws Exception {
85+
// Adding a 5-second cushion, as it takes some time for all OpenLineage events to be emitted and
86+
// saved to the file
87+
Thread.sleep(5 * 1000);
7688
List<JSONObject> eventList;
7789
try (Scanner scanner = new Scanner(file)) {
7890
eventList = new ArrayList<>();
@@ -117,4 +129,37 @@ public void testLineageEvent() throws Exception {
117129
assertThat(getFieldName(event, "outputs")).matches(fullTableName);
118130
});
119131
}
132+
133+
@Test
134+
public void testLineageEventWithQueryInput() throws Exception {
135+
String fullTableName = testDataset.toString() + "." + testTable;
136+
Dataset<Row> readDF =
137+
spark
138+
.read()
139+
.format("bigquery")
140+
.option("viewsEnabled", true)
141+
.option("materializationDataset", testDataset.toString())
142+
.option("query", "SELECT * FROM `bigquery-public-data.samples.shakespeare`")
143+
.load();
144+
145+
readDF.createOrReplaceTempView("words");
146+
Dataset<Row> writeDF =
147+
spark.sql("SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word");
148+
writeDF
149+
.write()
150+
.format("bigquery")
151+
.mode(SaveMode.Append)
152+
.option("table", fullTableName)
153+
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
154+
.option("writeMethod", "direct")
155+
.save();
156+
List<JSONObject> eventList = parseEventLogs(lineageFile);
157+
assertThat(eventList)
158+
.isNotEmpty(); // check if there is at least one event with both input and output
159+
eventList.forEach(
160+
(event) -> { // check if each of these events have the correct input and output
161+
assertThat(getFieldName(event, "inputs")).matches(TestConstants.SHAKESPEARE_TABLE);
162+
assertThat(getFieldName(event, "outputs")).matches(fullTableName);
163+
});
164+
}
120165
}

spark-bigquery-dsv2/spark-3.1-bigquery-lib/src/main/java/com/google/cloud/spark/bigquery/v2/Spark31BigQueryTable.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,16 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
109109
@Override
110110
public Map<String, String> properties() {
111111
SparkBigQueryConfig config = injector.getInstance(SparkBigQueryConfig.class);
112-
return ImmutableMap.<String, String>builder()
113-
.put("openlineage.dataset.name", BigQueryUtil.friendlyTableName(config.getTableId()))
114-
.put("openlineage.dataset.namespace", "bigquery")
115-
.put("openlineage.dataset.storageDatasetFacet.storageLayer", "bigquery")
116-
.build();
112+
ImmutableMap.Builder<String, String> propertiesBuilder =
113+
ImmutableMap.<String, String>builder()
114+
.put("openlineage.dataset.namespace", "bigquery")
115+
.put("openlineage.dataset.storageDatasetFacet.storageLayer", "bigquery");
116+
if (config.getQuery().isPresent()) {
117+
propertiesBuilder.put("openlineage.dataset.query", config.getQuery().get());
118+
} else {
119+
propertiesBuilder.put(
120+
"openlineage.dataset.name", BigQueryUtil.friendlyTableName(config.getTableId()));
121+
}
122+
return propertiesBuilder.build();
117123
}
118124
}

spark-bigquery-parent/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
<guava.version>33.4.0-jre</guava.version>
6767
<jackson.version>2.18.2</jackson.version>
6868
<netty.version>4.1.119.Final</netty.version>
69-
<openlineage-spark.version>1.27.0</openlineage-spark.version>
69+
<openlineage-spark.version>1.31.0</openlineage-spark.version>
7070
<paranamer.version>2.8</paranamer.version>
7171
<!-- Don't upgrade protobuf until bigquerystorage is compatible with 4.x version of proto.
7272
Ref : https://github.com/protocolbuffers/protobuf/issues/16452-->

0 commit comments

Comments
 (0)