Skip to content

Commit eeee6b0

Browse files
authored
Add --substrait-round-trip option in sqllogictests (#16183)
* Add substrait roundtrip option in sqllogictests * Fix doc link and missing license header * Add README.md entry for the Substrait round-trip mode * Link tracking issue in README.md * Use clap's `conflicts_with` instead of manually checking flag compatibility * Add sqllogictest-substrait job to the CI * Revert committed formatting changes to README.md
1 parent 5e307b3 commit eeee6b0

File tree

9 files changed

+291
-6
lines changed

9 files changed

+291
-6
lines changed

.github/workflows/rust.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,28 @@ jobs:
476476
POSTGRES_HOST: postgres
477477
POSTGRES_PORT: ${{ job.services.postgres.ports[5432] }}
478478

479+
sqllogictest-substrait:
480+
name: "Run sqllogictest in Substrait round-trip mode"
481+
needs: linux-build-lib
482+
runs-on: ubuntu-latest
483+
container:
484+
image: amd64/rust
485+
steps:
486+
- uses: actions/checkout@v4
487+
with:
488+
submodules: true
489+
fetch-depth: 1
490+
- name: Setup Rust toolchain
491+
uses: ./.github/actions/setup-builder
492+
with:
493+
rust-version: stable
494+
- name: Run sqllogictest
495+
# TODO: Right now several tests are failing in Substrait round-trip mode, so this
496+
# command cannot be run for all the .slt files. Run it for just one that works (limit.slt)
497+
# until most of the tickets in https://github.com/apache/datafusion/issues/16248 are addressed
498+
# and this command can be run without filters.
499+
run: cargo test --test sqllogictests -- --substrait-round-trip limit.slt
500+
479501
# Temporarily commenting out the Windows flow, the reason is enormously slow running build
480502
# Waiting for new Windows 2025 github runner
481503
# Details: https://github.com/apache/datafusion/issues/13726

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/sqllogictest/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ chrono = { workspace = true, optional = true }
4545
clap = { version = "4.5.39", features = ["derive", "env"] }
4646
datafusion = { workspace = true, default-features = true, features = ["avro"] }
4747
datafusion-spark = { workspace = true, default-features = true }
48+
datafusion-substrait = { path = "../substrait" }
4849
futures = { workspace = true }
4950
half = { workspace = true, default-features = true }
5051
indicatif = "0.17"

datafusion/sqllogictest/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,27 @@ Tests that need to write temporary files should write (only) to this
291291
directory to ensure they do not interfere with others concurrently
292292
running tests.
293293

294+
## Running tests: Substrait round-trip mode
295+
296+
This mode will run all the .slt test files in validation mode, adding a Substrait conversion round-trip for each
297+
generated DataFusion logical plan (SQL statement → DF logical → Substrait → DF logical → DF physical → execute).
298+
299+
Not all statements will be round-tripped, some statements like CREATE, INSERT, SET or EXPLAIN statements will be
300+
issued as is, but any other statement will be round-tripped to/from Substrait.
301+
302+
_WARNING_: as there are still a lot of failures in this mode (https://github.com/apache/datafusion/issues/16248),
303+
it is not enforced in the CI, instead, it needs to be run manually with the following command:
304+
305+
```shell
306+
cargo test --test sqllogictests -- --substrait-round-trip
307+
```
308+
309+
For focusing on one specific failing test, a file:line filter can be used:
310+
311+
```shell
312+
cargo test --test sqllogictests -- --substrait-round-trip binary.slt:23
313+
```
314+
294315
## `.slt` file format
295316

296317
[`sqllogictest`] was originally written for SQLite to verify the

datafusion/sqllogictest/bin/sqllogictests.rs

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use datafusion::common::utils::get_available_parallelism;
2121
use datafusion::common::{exec_err, DataFusionError, Result};
2222
use datafusion_sqllogictest::{
2323
df_value_validator, read_dir_recursive, setup_scratch_dir, should_skip_file,
24-
should_skip_record, value_normalizer, DataFusion, Filter, TestContext,
24+
should_skip_record, value_normalizer, DataFusion, DataFusionSubstraitRoundTrip,
25+
Filter, TestContext,
2526
};
2627
use futures::stream::StreamExt;
2728
use indicatif::{
@@ -102,6 +103,7 @@ async fn run_tests() -> Result<()> {
102103
// to stdout and return OK so they can continue listing other tests.
103104
return Ok(());
104105
}
106+
105107
options.warn_on_ignored();
106108

107109
#[cfg(feature = "postgres")]
@@ -138,8 +140,22 @@ async fn run_tests() -> Result<()> {
138140
let filters = options.filters.clone();
139141

140142
SpawnedTask::spawn(async move {
141-
match (options.postgres_runner, options.complete) {
142-
(false, false) => {
143+
match (
144+
options.postgres_runner,
145+
options.complete,
146+
options.substrait_round_trip,
147+
) {
148+
(_, _, true) => {
149+
run_test_file_substrait_round_trip(
150+
test_file,
151+
validator,
152+
m_clone,
153+
m_style_clone,
154+
filters.as_ref(),
155+
)
156+
.await?
157+
}
158+
(false, false, _) => {
143159
run_test_file(
144160
test_file,
145161
validator,
@@ -149,11 +165,11 @@ async fn run_tests() -> Result<()> {
149165
)
150166
.await?
151167
}
152-
(false, true) => {
168+
(false, true, _) => {
153169
run_complete_file(test_file, validator, m_clone, m_style_clone)
154170
.await?
155171
}
156-
(true, false) => {
172+
(true, false, _) => {
157173
run_test_file_with_postgres(
158174
test_file,
159175
validator,
@@ -163,7 +179,7 @@ async fn run_tests() -> Result<()> {
163179
)
164180
.await?
165181
}
166-
(true, true) => {
182+
(true, true, _) => {
167183
run_complete_file_with_postgres(
168184
test_file,
169185
validator,
@@ -210,6 +226,45 @@ async fn run_tests() -> Result<()> {
210226
}
211227
}
212228

229+
async fn run_test_file_substrait_round_trip(
230+
test_file: TestFile,
231+
validator: Validator,
232+
mp: MultiProgress,
233+
mp_style: ProgressStyle,
234+
filters: &[Filter],
235+
) -> Result<()> {
236+
let TestFile {
237+
path,
238+
relative_path,
239+
} = test_file;
240+
let Some(test_ctx) = TestContext::try_new_for_test_file(&relative_path).await else {
241+
info!("Skipping: {}", path.display());
242+
return Ok(());
243+
};
244+
setup_scratch_dir(&relative_path)?;
245+
246+
let count: u64 = get_record_count(&path, "DatafusionSubstraitRoundTrip".to_string());
247+
let pb = mp.add(ProgressBar::new(count));
248+
249+
pb.set_style(mp_style);
250+
pb.set_message(format!("{:?}", &relative_path));
251+
252+
let mut runner = sqllogictest::Runner::new(|| async {
253+
Ok(DataFusionSubstraitRoundTrip::new(
254+
test_ctx.session_ctx().clone(),
255+
relative_path.clone(),
256+
pb.clone(),
257+
))
258+
});
259+
runner.add_label("DatafusionSubstraitRoundTrip");
260+
runner.with_column_validator(strict_column_validator);
261+
runner.with_normalizer(value_normalizer);
262+
runner.with_validator(validator);
263+
let res = run_file_in_runner(path, runner, filters).await;
264+
pb.finish_and_clear();
265+
res
266+
}
267+
213268
async fn run_test_file(
214269
test_file: TestFile,
215270
validator: Validator,
@@ -578,6 +633,14 @@ struct Options {
578633
)]
579634
postgres_runner: bool,
580635

636+
#[clap(
637+
long,
638+
conflicts_with = "complete",
639+
conflicts_with = "postgres_runner",
640+
help = "Before executing each query, convert its logical plan to Substrait and from Substrait back to its logical plan"
641+
)]
642+
substrait_round_trip: bool,
643+
581644
#[clap(long, env = "INCLUDE_SQLITE", help = "Include sqlite files")]
582645
include_sqlite: bool,
583646

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
mod runner;
19+
20+
pub use runner::*;
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
use std::{path::PathBuf, time::Duration};
20+
21+
use crate::engines::datafusion_engine::Result;
22+
use crate::engines::output::{DFColumnType, DFOutput};
23+
use crate::{convert_batches, convert_schema_to_types, DFSqlLogicTestError};
24+
use arrow::record_batch::RecordBatch;
25+
use async_trait::async_trait;
26+
use datafusion::logical_expr::LogicalPlan;
27+
use datafusion::physical_plan::common::collect;
28+
use datafusion::physical_plan::execute_stream;
29+
use datafusion::prelude::SessionContext;
30+
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
31+
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
32+
use indicatif::ProgressBar;
33+
use log::Level::{Debug, Info};
34+
use log::{debug, log_enabled, warn};
35+
use sqllogictest::DBOutput;
36+
use tokio::time::Instant;
37+
38+
pub struct DataFusionSubstraitRoundTrip {
39+
ctx: SessionContext,
40+
relative_path: PathBuf,
41+
pb: ProgressBar,
42+
}
43+
44+
impl DataFusionSubstraitRoundTrip {
45+
pub fn new(ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar) -> Self {
46+
Self {
47+
ctx,
48+
relative_path,
49+
pb,
50+
}
51+
}
52+
53+
fn update_slow_count(&self) {
54+
let msg = self.pb.message();
55+
let split: Vec<&str> = msg.split(" ").collect();
56+
let mut current_count = 0;
57+
58+
if split.len() > 2 {
59+
// third match will be current slow count
60+
current_count = split[2].parse::<i32>().unwrap();
61+
}
62+
63+
current_count += 1;
64+
65+
self.pb
66+
.set_message(format!("{} - {} took > 500 ms", split[0], current_count));
67+
}
68+
}
69+
70+
#[async_trait]
71+
impl sqllogictest::AsyncDB for DataFusionSubstraitRoundTrip {
72+
type Error = DFSqlLogicTestError;
73+
type ColumnType = DFColumnType;
74+
75+
async fn run(&mut self, sql: &str) -> Result<DFOutput> {
76+
if log_enabled!(Debug) {
77+
debug!(
78+
"[{}] Running query: \"{}\"",
79+
self.relative_path.display(),
80+
sql
81+
);
82+
}
83+
84+
let start = Instant::now();
85+
let result = run_query_substrait_round_trip(&self.ctx, sql).await;
86+
let duration = start.elapsed();
87+
88+
if duration.gt(&Duration::from_millis(500)) {
89+
self.update_slow_count();
90+
}
91+
92+
self.pb.inc(1);
93+
94+
if log_enabled!(Info) && duration.gt(&Duration::from_secs(2)) {
95+
warn!(
96+
"[{}] Running query took more than 2 sec ({duration:?}): \"{sql}\"",
97+
self.relative_path.display()
98+
);
99+
}
100+
101+
result
102+
}
103+
104+
/// Engine name of current database.
105+
fn engine_name(&self) -> &str {
106+
"DataFusionSubstraitRoundTrip"
107+
}
108+
109+
/// `DataFusion` calls this function to perform sleep.
110+
///
111+
/// The default implementation is `std::thread::sleep`, which is universal to any async runtime
112+
/// but would block the current thread. If you are running in tokio runtime, you should override
113+
/// this by `tokio::time::sleep`.
114+
async fn sleep(dur: Duration) {
115+
tokio::time::sleep(dur).await;
116+
}
117+
118+
async fn shutdown(&mut self) {}
119+
}
120+
121+
async fn run_query_substrait_round_trip(
122+
ctx: &SessionContext,
123+
sql: impl Into<String>,
124+
) -> Result<DFOutput> {
125+
let df = ctx.sql(sql.into().as_str()).await?;
126+
let task_ctx = Arc::new(df.task_ctx());
127+
128+
let state = ctx.state();
129+
let round_tripped_plan = match df.logical_plan() {
130+
// Substrait does not handle these plans
131+
LogicalPlan::Ddl(_)
132+
| LogicalPlan::Explain(_)
133+
| LogicalPlan::Dml(_)
134+
| LogicalPlan::Copy(_)
135+
| LogicalPlan::Statement(_) => df.logical_plan().clone(),
136+
// For any other plan, convert to Substrait
137+
logical_plan => {
138+
let plan = to_substrait_plan(logical_plan, &state)?;
139+
from_substrait_plan(&state, &plan).await?
140+
}
141+
};
142+
143+
let physical_plan = state.create_physical_plan(&round_tripped_plan).await?;
144+
let stream = execute_stream(physical_plan, task_ctx)?;
145+
let types = convert_schema_to_types(stream.schema().fields());
146+
let results: Vec<RecordBatch> = collect(stream).await?;
147+
let rows = convert_batches(results, false)?;
148+
149+
if rows.is_empty() && types.is_empty() {
150+
Ok(DBOutput::StatementComplete(0))
151+
} else {
152+
Ok(DBOutput::Rows { types, rows })
153+
}
154+
}

datafusion/sqllogictest/src/engines/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
/// Implementation of sqllogictest for datafusion.
1919
mod conversion;
2020
mod datafusion_engine;
21+
mod datafusion_substrait_roundtrip_engine;
2122
mod output;
2223

2324
pub use datafusion_engine::convert_batches;
2425
pub use datafusion_engine::convert_schema_to_types;
2526
pub use datafusion_engine::DFSqlLogicTestError;
2627
pub use datafusion_engine::DataFusion;
28+
pub use datafusion_substrait_roundtrip_engine::DataFusionSubstraitRoundTrip;
2729
pub use output::DFColumnType;
2830
pub use output::DFOutput;
2931

datafusion/sqllogictest/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub use engines::DFColumnType;
3434
pub use engines::DFOutput;
3535
pub use engines::DFSqlLogicTestError;
3636
pub use engines::DataFusion;
37+
pub use engines::DataFusionSubstraitRoundTrip;
3738

3839
#[cfg(feature = "postgres")]
3940
pub use engines::Postgres;

0 commit comments

Comments
 (0)