Skip to content

Commit a190873

Browse files
anishm-dbsryza
authored andcommitted
[SPARK-52591][SDP] Validate streaming-ness of DFs returned by SDP table and standalone flow definitions
### What changes were proposed in this pull request? Validate that streaming flows are actually backed by streaming sources, and batch flows are actually backed by batch sources. Also improve SDP test harnesses to be explicit about whether a streaming table or materialized view is being created, to match the Python/SQL API. ### Why are the changes needed? This change helps prevent incorrect usage of streaming/batch flows, such as directly reading from a batch source from a streaming table's flow. In this case for example, the `STREAM` key word to mark a SQL batch source as streaming or `readStream` should be used in Python to stream read from an otherwise non-streaming file source. ### Does this PR introduce _any_ user-facing change? No, as this impacts SDP which is not released in any Spark version yet. ### How was this patch tested? Existing suites and added tests to `ConnectInvalidPipelineSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes #51208 from AnishMahto/sdp-validate-flow-streamingness. Lead-authored-by: anishm-db <[email protected]> Co-authored-by: Anish Mahto <[email protected]> Signed-off-by: Sandy Ryza <[email protected]>
1 parent af5632f commit a190873

File tree

15 files changed

+359
-157
lines changed

15 files changed

+359
-157
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2743,6 +2743,34 @@
27432743
],
27442744
"sqlState" : "42000"
27452745
},
2746+
"INVALID_FLOW_QUERY_TYPE" : {
2747+
"message" : [
2748+
"Flow <flowIdentifier> returns an invalid relation type."
2749+
],
2750+
"subClass" : {
2751+
"BATCH_RELATION_FOR_STREAMING_TABLE" : {
2752+
"message" : [
2753+
"Streaming tables may only be defined by streaming relations, but the flow <flowIdentifier> attempts to write a batch relation to the streaming table <tableIdentifier>. Consider using the STREAM operator in Spark-SQL to convert the batch relation into a streaming relation, or populating the streaming table with an append once-flow instead."
2754+
]
2755+
},
2756+
"STREAMING_RELATION_FOR_MATERIALIZED_VIEW" : {
2757+
"message" : [
2758+
"Materialized views may only be defined by a batch relation, but the flow <flowIdentifier> attempts to write a streaming relation to the materialized view <tableIdentifier>."
2759+
]
2760+
},
2761+
"STREAMING_RELATION_FOR_ONCE_FLOW" : {
2762+
"message" : [
2763+
"<flowIdentifier> is an append once-flow that is defined by a streaming relation. Append once-flows may only be defined by or return a batch relation."
2764+
]
2765+
},
2766+
"STREAMING_RELATION_FOR_PERSISTED_VIEW" : {
2767+
"message" : [
2768+
"Persisted views may only be defined by a batch relation, but the flow <flowIdentifier> attempts to write a streaming relation to the persisted view <viewIdentifier>."
2769+
]
2770+
}
2771+
},
2772+
"sqlState" : "42000"
2773+
},
27462774
"INVALID_FORMAT" : {
27472775
"message" : [
27482776
"The format is invalid: <format>."

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private[connect] object PipelinesHandler extends Logging {
161161
language = Option(Python())),
162162
format = Option.when(dataset.hasFormat)(dataset.getFormat),
163163
normalizedPath = None,
164-
isStreamingTableOpt = None))
164+
isStreamingTable = dataset.getDatasetType == proto.DatasetType.TABLE))
165165
case proto.DatasetType.TEMPORARY_VIEW =>
166166
val viewIdentifier =
167167
GraphIdentifierManager.parseTableIdentifier(dataset.getDatasetName, sparkSession)

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class PythonPipelineSuite
9393
val graph = buildGraph("""
9494
|@sdp.table
9595
|def table1():
96-
| return spark.range(10)
96+
| return spark.readStream.format("rate").load()
9797
|""".stripMargin)
9898
.resolve()
9999
.validate()
@@ -112,11 +112,11 @@ class PythonPipelineSuite
112112
|def c():
113113
| return spark.readStream.table("a")
114114
|
115-
|@sdp.table()
115+
|@sdp.materialized_view()
116116
|def d():
117117
| return spark.read.table("a")
118118
|
119-
|@sdp.table()
119+
|@sdp.materialized_view()
120120
|def a():
121121
| return spark.range(5)
122122
|""".stripMargin)
@@ -177,11 +177,11 @@ class PythonPipelineSuite
177177
test("referencing external datasets") {
178178
sql("CREATE TABLE spark_catalog.default.src AS SELECT * FROM RANGE(5)")
179179
val graph = buildGraph("""
180-
|@sdp.table
180+
|@sdp.materialized_view
181181
|def a():
182182
| return spark.read.table("spark_catalog.default.src")
183183
|
184-
|@sdp.table
184+
|@sdp.materialized_view
185185
|def b():
186186
| return spark.table("spark_catalog.default.src")
187187
|
@@ -230,11 +230,11 @@ class PythonPipelineSuite
230230
|def a():
231231
| return spark.read.table("spark_catalog.default.src")
232232
|
233-
|@sdp.table
233+
|@sdp.materialized_view
234234
|def b():
235235
| return spark.table("spark_catalog.default.src")
236236
|
237-
|@sdp.table
237+
|@sdp.materialized_view
238238
|def c():
239239
| return spark.readStream.table("spark_catalog.default.src")
240240
|""".stripMargin).resolve()

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ class SparkDeclarativePipelinesServerSuite
206206
sql = Some("SELECT * FROM STREAM tableA"))
207207
createTable(
208208
name = "tableC",
209-
datasetType = DatasetType.TABLE,
209+
datasetType = DatasetType.MATERIALIZED_VIEW,
210210
sql = Some("SELECT * FROM tableB"))
211211
}
212212

@@ -238,7 +238,7 @@ class SparkDeclarativePipelinesServerSuite
238238
createView(name = "viewC", sql = "SELECT * FROM curr.tableB")
239239
createTable(
240240
name = "other.tableD",
241-
datasetType = proto.DatasetType.TABLE,
241+
datasetType = proto.DatasetType.MATERIALIZED_VIEW,
242242
sql = Some("SELECT * FROM viewC"))
243243
}
244244

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,6 @@ class CoreDataflowNodeProcessor(rawGraph: DataflowGraph) {
8080
val resolvedFlowsToTable = flowsToTable.map { flow =>
8181
resolvedFlowNodesMap.get(flow.identifier)
8282
}
83-
84-
// Assign isStreamingTable (MV or ST) to the table based on the resolvedFlowsToTable
85-
val tableWithType = table.copy(
86-
isStreamingTableOpt = Option(resolvedFlowsToTable.exists(f => f.df.isStreaming))
87-
)
88-
8983
// We mark all tables as virtual to ensure resolution uses incoming flows
9084
// rather than previously materialized tables.
9185
val virtualTableInput = VirtualTableInput(
@@ -95,7 +89,7 @@ class CoreDataflowNodeProcessor(rawGraph: DataflowGraph) {
9589
availableFlows = resolvedFlowsToTable
9690
)
9791
resolvedInputs.put(table.identifier, virtualTableInput)
98-
Seq(tableWithType)
92+
Seq(table)
9993
case view: View =>
10094
// For view, add the flow to resolvedInputs and return empty.
10195
require(upstreamNodes.size == 1, "Found multiple flows to view")

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
191191
validatePersistedViewSources()
192192
validateEveryDatasetHasFlow()
193193
validateTablesAreResettable()
194+
validateFlowStreamingness()
194195
inferredSchema
195196
}.failed
196197

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,15 @@ object DatasetManager extends Logging {
178178
}
179179

180180
// Wipe the data if we need to
181-
if ((isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined) {
181+
if ((isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined) {
182182
context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}")
183183
}
184184

185185
// Alter the table if we need to
186186
if (existingTableOpt.isDefined) {
187187
val existingSchema = existingTableOpt.get.schema()
188188

189-
val targetSchema = if (table.isStreamingTableOpt.get && !isFullRefresh) {
189+
val targetSchema = if (table.isStreamingTable && !isFullRefresh) {
190190
SchemaMergingUtils.mergeSchemas(existingSchema, outputSchema)
191191
} else {
192192
outputSchema

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,82 @@ trait GraphValidations extends Logging {
5555
multiQueryTables
5656
}
5757

58+
/**
59+
* Validate that each resolved flow is correctly either a streaming flow or non-streaming flow,
60+
* depending on the flow type (ex. once flow vs non-once flow) and the dataset type the flow
61+
* writes to (ex. streaming table vs materialized view).
62+
*/
63+
protected[graph] def validateFlowStreamingness(): Unit = {
64+
flowsTo.foreach { case (destTableIdentifier, flowsToDataset) =>
65+
// The identifier should correspond to exactly one of a table or view
66+
val destTableOpt = table.get(destTableIdentifier)
67+
val destViewOpt = view.get(destTableIdentifier)
68+
69+
val resolvedFlowsToDataset: Seq[ResolvedFlow] = flowsToDataset.collect {
70+
case rf: ResolvedFlow => rf
71+
}
72+
73+
resolvedFlowsToDataset.foreach { resolvedFlow: ResolvedFlow =>
74+
// A flow must be successfully analyzed, thus resolved, in order to determine if it is
75+
// streaming or not. Unresolved flows will throw an exception anyway via
76+
// [[validateSuccessfulFlowAnalysis]], so don't check them here.
77+
if (resolvedFlow.once) {
78+
// Once flows by definition should be batch flows, not streaming.
79+
if (resolvedFlow.df.isStreaming) {
80+
throw new AnalysisException(
81+
errorClass = "INVALID_FLOW_QUERY_TYPE.STREAMING_RELATION_FOR_ONCE_FLOW",
82+
messageParameters = Map(
83+
"flowIdentifier" -> resolvedFlow.identifier.quotedString
84+
)
85+
)
86+
}
87+
} else {
88+
destTableOpt.foreach { destTable =>
89+
if (destTable.isStreamingTable) {
90+
if (!resolvedFlow.df.isStreaming) {
91+
throw new AnalysisException(
92+
errorClass = "INVALID_FLOW_QUERY_TYPE.BATCH_RELATION_FOR_STREAMING_TABLE",
93+
messageParameters = Map(
94+
"flowIdentifier" -> resolvedFlow.identifier.quotedString,
95+
"tableIdentifier" -> destTableIdentifier.quotedString
96+
)
97+
)
98+
}
99+
} else {
100+
if (resolvedFlow.df.isStreaming) {
101+
// This check intentionally does NOT prevent materialized views from reading from
102+
// a streaming table using a _batch_ read, which is still considered valid.
103+
throw new AnalysisException(
104+
errorClass = "INVALID_FLOW_QUERY_TYPE.STREAMING_RELATION_FOR_MATERIALIZED_VIEW",
105+
messageParameters = Map(
106+
"flowIdentifier" -> resolvedFlow.identifier.quotedString,
107+
"tableIdentifier" -> destTableIdentifier.quotedString
108+
)
109+
)
110+
}
111+
}
112+
}
113+
114+
destViewOpt.foreach {
115+
case _: PersistedView =>
116+
if (resolvedFlow.df.isStreaming) {
117+
throw new AnalysisException(
118+
errorClass = "INVALID_FLOW_QUERY_TYPE.STREAMING_RELATION_FOR_PERSISTED_VIEW",
119+
messageParameters = Map(
120+
"flowIdentifier" -> resolvedFlow.identifier.quotedString,
121+
"viewIdentifier" -> destTableIdentifier.quotedString
122+
)
123+
)
124+
}
125+
case _: TemporaryView =>
126+
// Temporary views' flows are allowed to be either streaming or batch, so no
127+
// validation needs to be done for them
128+
}
129+
}
130+
}
131+
}
132+
}
133+
58134
/** Throws an exception if the flows in this graph are not topologically sorted. */
59135
protected[graph] def validateGraphIsTopologicallySorted(): Unit = {
60136
val visitedNodes = mutable.Set.empty[TableIdentifier] // Set of visited nodes

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ class SqlGraphRegistrationContext(
199199
),
200200
format = cst.tableSpec.provider,
201201
normalizedPath = None,
202-
isStreamingTableOpt = None
202+
isStreamingTable = true
203203
)
204204
)
205205
}
@@ -230,7 +230,7 @@ class SqlGraphRegistrationContext(
230230
),
231231
format = cst.tableSpec.provider,
232232
normalizedPath = None,
233-
isStreamingTableOpt = None
233+
isStreamingTable = true
234234
)
235235
)
236236

@@ -281,7 +281,7 @@ class SqlGraphRegistrationContext(
281281
),
282282
format = cmv.tableSpec.provider,
283283
normalizedPath = None,
284-
isStreamingTableOpt = None
284+
isStreamingTable = false
285285
)
286286
)
287287

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,7 @@ sealed trait TableInput extends Input {
114114
* path (if not defined, we will normalize a managed storage path for it).
115115
* @param properties Table Properties to set in table metadata.
116116
* @param comment User-specified comment that can be placed on the table.
117-
* @param isStreamingTableOpt if the table is a streaming table, will be None until we have resolved
118-
* flows into table
117+
* @param isStreamingTable if the table is a streaming table, as defined by the source code.
119118
*/
120119
case class Table(
121120
identifier: TableIdentifier,
@@ -125,7 +124,7 @@ case class Table(
125124
properties: Map[String, String] = Map.empty,
126125
comment: Option[String],
127126
baseOrigin: QueryOrigin,
128-
isStreamingTableOpt: Option[Boolean],
127+
isStreamingTable: Boolean,
129128
format: Option[String]
130129
) extends TableInput
131130
with Output {
@@ -163,17 +162,6 @@ case class Table(
163162
normalizedPath.get
164163
}
165164

166-
/**
167-
* Tell if a table is a streaming table or not. This property is not set until we have resolved
168-
* the flows into the table. The exception reminds engineers that they cant call at random time.
169-
*/
170-
def isStreamingTable: Boolean = isStreamingTableOpt.getOrElse {
171-
throw new IllegalStateException(
172-
"Cannot identify whether the table is streaming table or not. You may need to resolve the " +
173-
"flows into table."
174-
)
175-
}
176-
177165
/**
178166
* Get the DatasetType of the table
179167
*/

0 commit comments

Comments
 (0)