-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52823][SQL] Support DSv2 Join pushdown for Oracle connector #51519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-52823][SQL] Support DSv2 Join pushdown for Oracle connector #51519
Conversation
checkAnswer(df, rows) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have an ANTI JOIN as well?
or is that unsupported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anti join is not supported for pushdown, but yes, Spark has anti join
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper} | ||
import org.apache.spark.sql.internal.SQLConf | ||
|
||
trait V2JDBCPushdownTestUtils extends ExplainSuiteHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about DataSourcePushdownTestUtils
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of v2 folder, in connector folder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've put it in sql/connector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, perfect
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala
Outdated
Show resolved
Hide resolved
connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2JoinPushdownSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCJoinPushdownIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCJoinPushdownIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCJoinPushdownIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCJoinPushdownIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
|
||
protected def caseConvert(tableName: String): String = tableName | ||
|
||
protected def withConnection[T](f: Connection => T): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this suite more generic and to decouple it from JDBC to make it reusable by other non JDBC connectors?
Maybe some class hieararchy like:
JoinPushdownIntegrationSuiteBase
JDBCJoinPushdownIntegrationSuiteBase extends JoinPushdownIntegrationSuiteBase
OracleJoinPushdownIntegrationSuiteBase extends JDBCJoinPushdownIntegrationSuiteBase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do it in separate PR if it's fine with you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am ok with that, that cause a little bit extra work for reviewers, but that is fine. Just have in mind we need another layer of abstraction, so we can easier do a followup.
val random = new java.util.Random(42) | ||
val table1Data = (1 to 100).map { i => | ||
val id = i % 11 | ||
val amount = BigDecimal.valueOf(random.nextDouble() * 10000) | ||
.setScale(2, BigDecimal.RoundingMode.HALF_UP) | ||
val address = s"address_$i" | ||
(id, amount, address) | ||
} | ||
val table2Data = (1 to 100).map { i => | ||
val id = (i % 17) | ||
val next_id = (id + 1) % 17 | ||
val salary = BigDecimal.valueOf(random.nextDouble() * 50000) | ||
.setScale(2, BigDecimal.RoundingMode.HALF_UP) | ||
val surname = s"surname_$i" | ||
(id, next_id, salary, surname) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this make some flakiness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 42 seed of Random function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
42 is seed, so shouldn't be flaky
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, however, I think it is better to materialize results as static rows so it would be more readable. Without debugger, developers won't know what are our test data, and it is hard to modify it as well later if we need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I have materialized them. Also, added batch execution for JDBC statements, previously we were doing 100 network calls, now only 1.
d5cf5f1
to
e6bdb53
Compare
override val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" | ||
|
||
override val catalogName: String = "h2" | ||
override val namespaceOpt: Option[String] = Some("test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it's an opt? most dialects must have a schema, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've followed similar design as V2JDBCTest
which has namespaceOpt
as Option. For example, MsSqlServerIntegrationSuite
and MySQLIntegrationSuite
don't override it.
I guess these tests will still work if we add a schema, I just wanted to be consistent with V2JDBCTest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is maybe personal preference, but I like to explicitly put schema actually. It is a little bit confusing to me we have tests where we use just catalog.table
, because it is unintuitive we use 2 part identifier for catalog.table
instead of schema.table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have any preference here. I can make schema to not be an Option if it's a red flag here.
.set("spark.sql.catalog.h2.pushDownAggregate", "true") | ||
.set("spark.sql.catalog.h2.pushDownLimit", "true") | ||
.set("spark.sql.catalog.h2.pushDownOffset", "true") | ||
.set("spark.sql.catalog.h2.pushDownJoin", "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we move most of the conf settings to the parent class? then here we only need
override def sparkConf: SparkConf = super.sparkConf.set("spark.sql.catalog.h2.driver", "org.h2.Driver")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the common confs to base trait
val tempDir = Utils.createTempDir() | ||
override val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" | ||
|
||
override val catalogName: String = "h2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems the catalog name doesn't matter for the test cases, shall we just hardcode jdbc_test
in the parent class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, followed the pattern in V2JDBCTest
. Changed it to join_pushdown_catalog
in common trait
} | ||
} | ||
|
||
def dataPreparation(connection: Connection): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add docs here to describe a little bit what tables and schemas we need to create.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, is it better?
} | ||
} | ||
|
||
def dataPreparation(connection: Connection): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be easier to do followup later, can we remove connection argument and make each of these 3 make it's own connection, or if you want to save some calls, we can store connection somewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + | ||
s"$catalogAndNamespace.${caseConvert(joinTableName1)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can provide here sequence of relations instead of doing string concat, it would be better fit for your signature.
Also, can you name parameter here?
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + | |
s"$catalogAndNamespace.${caseConvert(joinTableName1)}" | |
expectedTables = Seq( | |
s"$catalogAndNamespace.${caseConvert(joinTableName1)}", | |
s"$catalogAndNamespace.${caseConvert(joinTableName1)}" | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really? If you have duplicate tables, like in my case, it's not asserted that it is shown 2 times in explain.
With string, we are doing it.
withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { | ||
val df = sql(sqlQuery) | ||
|
||
checkJoinPushed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We did not check whether column pruning actually happened, that can be done by checking leaf relation output
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done now. UUIDs are used in column names so it's kind of hard to add expected schema.
...ore/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
Show resolved
Hide resolved
SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { | ||
val df = sql(sqlQuery) | ||
|
||
checkSortRemoved(df) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one depends on scan builder implementor, we should make overridable member which tells whether sortLimit is supported and provide here that value:
Like:
checkSortRemoved(df) | |
checkSortRemoved(df, sortLimitPushdownSupported) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added supportsXYZpushdown
in DataSourcePushdownTestUtils. Suites that extend it can override these values if some of the pushdowns are not supported
d61b609
to
f26b1a1
Compare
|
||
// This method comes from DockerJDBCIntegrationSuite | ||
override def dataPreparation(connection: Connection): Unit = { | ||
super.dataPreparation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this override
is a noop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it calls dataPreparation from JDBCV2JoinPushdownIntegrationSuiteBase
that has default implementation.
I just created dataPreparation() to be compatible for future non-jdbc connectors.
|
||
private def catalogAndNamespace = s"$catalogName.$namespace" | ||
|
||
def qualifyTableName(tableName: String): String = s"$namespace.$tableName" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we have jdbcDialect
, shall we just call it to qualify the names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, did the change.
f010dd5
to
76c7a6f
Compare
76c7a6f
to
d29b0a9
Compare
a690e55
to
6b54dec
Compare
6b54dec
to
7d6f701
Compare
What changes were proposed in this pull request?
In #50921, Join pushdown was added for DSv2 and it was only enabled for H2 dialect.
With this PR, I am enabling DSv2 join pushdown for Oracle connector as well.
For this purpose,
OracleDialect
has nowsupportsJoin
equal to true.Also, changed SQL query generation to use
tableOrQuery
method instead ofoptions.tableOrQuery
.The rest of the change is test only:
V2JDBCTest
to new traitV2JDBCPushdownTestUtils
JDBCJoinPushdownIntegrationSuite
that can be used for testing other connectors as wellOracleJoinPushdownIntegrationSuite
as the first implementation of the traitJDBCV2JoinPushdownSuite
to inheritJDBCJoinPushdownIntegrationSuite
Why are the changes needed?
Does this PR introduce any user-facing change?
Inner joins will be pushed down to Oracle data source only if
spark.sql.optimizer.datasourceV2JoinPushdown
SQL conf is set to true. Currently, the default value is false.Previously, Spark SQL query
would produce the following Optimized plan:
Now, with join pushdown enabled, the plan would be:
When join is pushed down, the physical plan will contain
PushedJoins
information, which is the array of all the tables joined. For example, in the above case it would be:The generated SQL query would be:
How was this patch tested?
New tests.
Was this patch authored or co-authored using generative AI tooling?