Skip to content

chore(core): bump DataFusion version for bug fixing #810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
323 changes: 285 additions & 38 deletions wren-modeling-py/Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion wren-modeling-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ version = "0.1.0"

[workspace.dependencies]
async-trait = "0.1.80"
datafusion = { version = "42.0.0", default-features = false }
# We require the following commits
# https://github.com/apache/datafusion/pull/12605
# https://github.com/apache/datafusion/pull/12603
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "35adf47fdbd626d79051799921146b96e3345e3b" }
env_logger = "0.11.3"
log = { version = "0.4.14" }
serde = { version = "1.0.201", features = ["derive", "rc"] }
Expand Down
7 changes: 7 additions & 0 deletions wren-modeling-rs/core/src/logical_plan/analyze/expand_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use datafusion::common::Result;
use datafusion::config::ConfigOptions;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
use datafusion::optimizer::AnalyzerRule;
use std::fmt::Debug;
use std::sync::Arc;

pub struct ExpandWrenViewRule {
Expand All @@ -25,6 +26,12 @@ impl ExpandWrenViewRule {
}
}

impl Debug for ExpandWrenViewRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExpandWrenViewRule").finish()
}
}

impl AnalyzerRule for ExpandWrenViewRule {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
let plan = plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use datafusion::logical_expr::{
use datafusion::optimizer::AnalyzerRule;
use std::cell::{RefCell, RefMut};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;

/// [ModelAnalyzeRule] responsible for analyzing the model plan node. Turn TableScan from a model to a ModelPlanNode.
Expand All @@ -34,6 +35,12 @@ pub struct ModelAnalyzeRule {
session_state: SessionStateRef,
}

impl Debug for ModelAnalyzeRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ModelAnalyzeRule").finish()
}
}

impl AnalyzerRule for ModelAnalyzeRule {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
let root = RefCell::new(Scope::new());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::Debug;
use std::sync::Arc;

use datafusion::common::config::ConfigOptions;
Expand Down Expand Up @@ -206,6 +207,12 @@ impl ModelGenerationRule {
}
}

impl Debug for ModelGenerationRule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ModelGenerationRule").finish()
}
}

impl AnalyzerRule for ModelGenerationRule {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
let transformed_up = plan
Expand Down
24 changes: 24 additions & 0 deletions wren-modeling-rs/core/src/logical_plan/analyze/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,12 @@ fn merge_graph(
Ok(())
}

impl PartialOrd for ModelPlanNode {
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}

impl UserDefinedLogicalNodeCore for ModelPlanNode {
fn name(&self) -> &str {
"Model"
Expand Down Expand Up @@ -794,6 +800,12 @@ impl ModelSourceNode {
}
}

impl PartialOrd for ModelSourceNode {
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}

impl UserDefinedLogicalNodeCore for ModelSourceNode {
fn name(&self) -> &str {
"ModelSource"
Expand Down Expand Up @@ -889,6 +901,12 @@ impl CalculationPlanNode {
}
}

impl PartialOrd for CalculationPlanNode {
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}

impl UserDefinedLogicalNodeCore for CalculationPlanNode {
fn name(&self) -> &str {
"Calculation"
Expand Down Expand Up @@ -944,6 +962,12 @@ impl PartialModelPlanNode {
}
}

impl PartialOrd for PartialModelPlanNode {
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}

impl UserDefinedLogicalNodeCore for PartialModelPlanNode {
fn name(&self) -> &str {
"PartialModel"
Expand Down
1 change: 1 addition & 0 deletions wren-modeling-rs/core/src/mdl/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub async fn register_table_with_mdl(
Ok(())
}

#[derive(Debug)]
pub struct WrenDataSource {
schema: SchemaRef,
}
Expand Down
16 changes: 10 additions & 6 deletions wren-modeling-rs/core/src/mdl/function.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::common::internal_err;
use datafusion::common::Result;
use datafusion::logical_expr::function::AccumulatorArgs;
use datafusion::logical_expr::function::{AccumulatorArgs, WindowUDFFieldArgs};
use datafusion::logical_expr::{
Accumulator, AggregateUDFImpl, ColumnarValue, PartitionEvaluator, ScalarUDFImpl,
Signature, TypeSignature, Volatility, WindowUDFImpl,
Expand Down Expand Up @@ -129,13 +129,17 @@ impl WindowUDFImpl for ByPassWindowFunction {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(self.return_type.clone())
}

fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
internal_err!("This function should not be called")
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(
field_args.name(),
self.return_type.clone(),
false,
))
}
}

#[cfg(test)]
Expand Down
3 changes: 1 addition & 2 deletions wren-modeling-rs/sqllogictest/test_files/tpch/q14.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.

# the value should be `15.486545812284` but sqllogictests will round it to `15.48654581`
query R
select
100.00 * sum(case
Expand All @@ -32,5 +31,5 @@ where
and l_shipdate >= date '1995-09-01'
and l_shipdate < date '1995-10-01';
----
15.48654581
15.486545812284

Loading