Skip to content

[SPARK-52823][SQL] Support DSv2 Join pushdown for Oracle connector #51519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

PetarVasiljevic-DB
Copy link
Contributor

@PetarVasiljevic-DB PetarVasiljevic-DB commented Jul 16, 2025

What changes were proposed in this pull request?

In #50921, Join pushdown was added for DSv2 and it was only enabled for H2 dialect.
With this PR, I am enabling DSv2 join pushdown for Oracle connector as well.

For this purpose, OracleDialect has now supportsJoin equal to true.
Also, changed SQL query generation to use tableOrQuery method instead of options.tableOrQuery.

The rest of the change is test only:

  • Extracted pushdown util methods from V2JDBCTest to new trait V2JDBCPushdownTestUtils
  • Created new integration trait JDBCJoinPushdownIntegrationSuite that can be used for testing other connectors as well
  • Added OracleJoinPushdownIntegrationSuite as the first implementation of the trait
  • Changed JDBCV2JoinPushdownSuite to inherit JDBCJoinPushdownIntegrationSuite

Why are the changes needed?

Does this PR introduce any user-facing change?

Inner joins will be pushed down to Oracle data source only if spark.sql.optimizer.datasourceV2JoinPushdown SQL conf is set to true. Currently, the default value is false.

Previously, Spark SQL query

SELECT tbl1.id, tbl1.name, tbl2.id 
FROM oracleCatalog.tbl1 t1 
JOIN oracleCatalog.tbl2 t2 
ON t1.id = t2.id + 1

would produce the following Optimized plan:

== Optimized Logical Plan ==
Join Inner, (id#0 = (id#1 + 1))
:- Filter isnotnull(id#0)
:  +- RelationV2[id#0] oracleCatalog.tbl1
+- Filter isnotnull(id#1, name#2)
   +- RelationV2[id#1, name#2] oracleCatalog.tbl2

Now, with join pushdown enabled, the plan would be:

Project [ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c#3 AS id#0, ID#4 AS id#1, NAME#5 AS name#2]
+- RelationV2[ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c#3, ID#4, NAME#5] oracleCatalog.tbl1

When join is pushed down, the physical plan will contain PushedJoins information, which is the array of all the tables joined. For example, in the above case it would be:

PushedJoins: [oracleCatalog.tbl1, oracleCatalog.tbl2]

The generated SQL query would be:

SELECT
    "ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c",
    "ID",
    "NAME"
FROM
    (
        SELECT
            "ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c",
            "ID",
            "NAME"
        FROM
            (
                SELECT
                    "ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c",
                    "ID",
                    "NAME"
                FROM
                    (
                        SELECT
                            "ID" AS "ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c",
                            "NAME"
                        FROM
                            "SYSTEM"."TBL1"
                        WHERE
                            ("ID" IS NOT NULL)
                    ) join_subquery_4
                    INNER JOIN (
                        SELECT
                            "ID"
                        FROM
                            "SYSTEM"."TBL2"
                        WHERE
                            ("ID" IS NOT NULL)
                    ) join_subquery_5 ON "ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c" = "ID"
            )
    ) SPARK_GEN_SUBQ_30

How was this patch tested?

New tests.

Was this patch authored or co-authored using generative AI tooling?

@github-actions github-actions bot added the SQL label Jul 16, 2025
checkAnswer(df, rows)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have an ANTI JOIN as well?
or is that unsupported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anti join is not supported for pushdown, but yes, Spark has anti join

import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
import org.apache.spark.sql.internal.SQLConf

trait V2JDBCPushdownTestUtils extends ExplainSuiteHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about DataSourcePushdownTestUtils?

Copy link
Contributor

@urosstan-db urosstan-db Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of v2 folder, in connector folder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put it in sql/connector

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, perfect


protected def caseConvert(tableName: String): String = tableName

protected def withConnection[T](f: Connection => T): T = {
Copy link
Contributor

@urosstan-db urosstan-db Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this suite more generic and to decouple it from JDBC to make it reusable by other non JDBC connectors?

Maybe some class hieararchy like:

  • JoinPushdownIntegrationSuiteBase
  • JDBCJoinPushdownIntegrationSuiteBase extends JoinPushdownIntegrationSuiteBase
  • OracleJoinPushdownIntegrationSuiteBase extends JDBCJoinPushdownIntegrationSuiteBase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do it in separate PR if it's fine with you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok with that, that cause a little bit extra work for reviewers, but that is fine. Just have in mind we need another layer of abstraction, so we can easier do a followup.

Comment on lines 98 to 113
val random = new java.util.Random(42)
val table1Data = (1 to 100).map { i =>
val id = i % 11
val amount = BigDecimal.valueOf(random.nextDouble() * 10000)
.setScale(2, BigDecimal.RoundingMode.HALF_UP)
val address = s"address_$i"
(id, amount, address)
}
val table2Data = (1 to 100).map { i =>
val id = (i % 17)
val next_id = (id + 1) % 17
val salary = BigDecimal.valueOf(random.nextDouble() * 50000)
.setScale(2, BigDecimal.RoundingMode.HALF_UP)
val surname = s"surname_$i"
(id, next_id, salary, surname)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this make some flakiness?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 42 seed of Random function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

42 is seed, so shouldn't be flaky

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, however, I think it is better to materialize results as static rows so it would be more readable. Without debugger, developers won't know what are our test data, and it is hard to modify it as well later if we need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I have materialized them. Also, added batch execution for JDBC statements, previously we were doing 100 network calls, now only 1.

@PetarVasiljevic-DB PetarVasiljevic-DB force-pushed the support_join_for_oracle branch from d5cf5f1 to e6bdb53 Compare July 17, 2025 20:17
override val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"

override val catalogName: String = "h2"
override val namespaceOpt: Option[String] = Some("test")
Copy link
Contributor

@cloud-fan cloud-fan Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it's an opt? most dialects must have a schema, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've followed similar design as V2JDBCTest which has namespaceOpt as Option. For example, MsSqlServerIntegrationSuite and MySQLIntegrationSuite don't override it.

I guess these tests will still work if we add a schema, I just wanted to be consistent with V2JDBCTest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is maybe personal preference, but I like to explicitly put schema actually. It is a little bit confusing to me we have tests where we use just catalog.table, because it is unintuitive we use 2 part identifier for catalog.table instead of schema.table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any preference here. I can make schema to not be an Option if it's a red flag here.

.set("spark.sql.catalog.h2.pushDownAggregate", "true")
.set("spark.sql.catalog.h2.pushDownLimit", "true")
.set("spark.sql.catalog.h2.pushDownOffset", "true")
.set("spark.sql.catalog.h2.pushDownJoin", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move most of the conf settings to the parent class? then here we only need

override def sparkConf: SparkConf = super.sparkConf.set("spark.sql.catalog.h2.driver", "org.h2.Driver")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the common confs to base trait

val tempDir = Utils.createTempDir()
override val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"

override val catalogName: String = "h2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems the catalog name doesn't matter for the test cases, shall we just hardcode jdbc_test in the parent class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, followed the pattern in V2JDBCTest. Changed it to join_pushdown_catalog in common trait

}
}

def dataPreparation(connection: Connection): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add docs here to describe a little bit what tables and schemas we need to create.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, is it better?

}
}

def dataPreparation(connection: Connection): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be easier to do followup later, can we remove connection argument and make each of these 3 make it's own connection, or if you want to save some calls, we can store connection somewhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 207 to 240
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
s"$catalogAndNamespace.${caseConvert(joinTableName1)}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can provide here sequence of relations instead of doing string concat, it would be better fit for your signature.
Also, can you name parameter here?

Suggested change
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
s"$catalogAndNamespace.${caseConvert(joinTableName1)}"
expectedTables = Seq(
s"$catalogAndNamespace.${caseConvert(joinTableName1)}",
s"$catalogAndNamespace.${caseConvert(joinTableName1)}"
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really? If you have duplicate tables, like in my case, it's not asserted that it is shown 2 times in explain.
With string, we are doing it.

withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
val df = sql(sqlQuery)

checkJoinPushed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did not check whether column pruning actually happened, that can be done by checking leaf relation output

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done now. UUIDs are used in column names so it's kind of hard to add expected schema.

SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
val df = sql(sqlQuery)

checkSortRemoved(df)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one depends on scan builder implementor, we should make overridable member which tells whether sortLimit is supported and provide here that value:
Like:

Suggested change
checkSortRemoved(df)
checkSortRemoved(df, sortLimitPushdownSupported)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added supportsXYZpushdown in DataSourcePushdownTestUtils. Suites that extend it can override these values if some of the pushdowns are not supported

@PetarVasiljevic-DB PetarVasiljevic-DB force-pushed the support_join_for_oracle branch 2 times, most recently from d61b609 to f26b1a1 Compare July 18, 2025 11:27

// This method comes from DockerJDBCIntegrationSuite
override def dataPreparation(connection: Connection): Unit = {
super.dataPreparation()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this override is a noop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it calls dataPreparation from JDBCV2JoinPushdownIntegrationSuiteBase that has default implementation.

I just created dataPreparation() to be compatible for future non-jdbc connectors.


private def catalogAndNamespace = s"$catalogName.$namespace"

def qualifyTableName(tableName: String): String = s"$namespace.$tableName"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we have jdbcDialect, shall we just call it to qualify the names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, did the change.

@PetarVasiljevic-DB PetarVasiljevic-DB force-pushed the support_join_for_oracle branch from f010dd5 to 76c7a6f Compare July 19, 2025 01:06
@PetarVasiljevic-DB PetarVasiljevic-DB force-pushed the support_join_for_oracle branch from 76c7a6f to d29b0a9 Compare July 19, 2025 12:09
@PetarVasiljevic-DB PetarVasiljevic-DB force-pushed the support_join_for_oracle branch 5 times, most recently from a690e55 to 6b54dec Compare July 20, 2025 09:55
@PetarVasiljevic-DB PetarVasiljevic-DB force-pushed the support_join_for_oracle branch from 6b54dec to 7d6f701 Compare July 20, 2025 12:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants