Skip to content

Fix ScriptImplementor projection when using filters and add many tests #136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 8, 2025

Conversation

jogrogan
Copy link
Collaborator

@jogrogan jogrogan commented May 7, 2025

Changes are split into different commits:

  1. Fix some error handling and add error tests (see test cases with !error for more details)
  2. Added tests for Venice primitive keys, add Venice/Kafka tests for materialized views
  3. Update missing copied params in PipelineRules - largely doesn't matter but good for consistency and would be easily missed if we added features using these in the future
  4. Fixed planner to ensure Project rule is top-level for ScriptImplementor. This was noticed when using Filters (WHERE clause), the RelNode used in ScriptImplementor lost all context of field renames from the source to sink schema which caused fields to be dropped when trying to map to the targetFields.
    Copying comment added to code:
    // We need to run the plan without field trimming enabled. The intention of field trimming is to optimize
    // away unused fields. Calcite is able to make micro optimizations to the plan but at the cost of making
    // no guarantees about what that fields will be named in the resulting RelNode. For the ScriptImplementor,
    // field names are important because they are used to generate the final SQL against the sink table. This will
    // almost always require some top-level Project in the plan, but with trimming enabled, identity projects
    // (just field renames) are optimized out of the plan.
    // See discussion in https://issues.apache.org/jira/browse/CALCITE-1297

I was able to get something similar working by adding a top-level project right before the ScriptImplementor call but this felt more hacky and caused the where clauses to be generated as something strange:

- INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `FIRST_NAME` AS `PAGE_URN`, `LAST_NAME` AS `MEMBER_URN` FROM (SELECT `FIRST_NAME`, `LAST_NAME`         FROM `PROFILE`.`MEMBERS`) AS `t` WHERE `FIRST_NAME` = 'Alice'

Copying that solution for visibility:

  ...
  RelNode projection = forceProjection(query, getFilteredTargetFields(targetRowType, targetFields));
  ...
}

// Largely a copy of RootRel.project() with the added feature to be able to input our own targetRowType
// referring to the columns found in the sink.
static RelNode forceProjection(RelNode relNode, RelDataType targetRowType) {
  // If the query is already a projection, just return it
  if (relNode instanceof Project) {
    return relNode;
  }

  RelRoot root = RelRoot.of(relNode, SqlKind.AS);
  assert root.fields.size() == targetRowType.getFieldList().size();

  final List<RexNode> projects = new ArrayList<>(root.fields.size());
  final RexBuilder rexBuilder = root.rel.getCluster().getRexBuilder();
  root.fields.forEach((i, name) -> projects.add(rexBuilder.makeInputRef(root.rel, i)));
  RelTraitSet traitSet = root.rel.getTraitSet().replace(PipelineRel.CONVENTION);
  return new PipelineRules.PipelineProject(root.rel.getCluster(), traitSet, root.hints, root.rel, projects,
      targetRowType);
}

// Filter the target fields to only include those that are present in the sink schema
static RelDataType getFilteredTargetFields(RelDataType targetRowType, ImmutablePairList<Integer, String> targetFields) {
  RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
  return typeFactory.createStructType(
      targetRowType.getFieldList().stream()
          .filter(x -> targetFields.rightList().contains(x.getName()))
          .map(x -> new RelDataTypeFieldImpl(x.getName(), x.getIndex(), x.getType()))
          .collect(Collectors.toList()));
}

Copy link
Collaborator

@ryannedolan ryannedolan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impressive! Very subtle bugs here. Love the improved error messages and quidem tests.

final SchemaPlus schemaPlus = pair.left.plus();
if (schemaPlus.getTable(pair.right) instanceof HoptimatorJdbcTable) {
throw new DdlException(create,
"Cannot overwrite physical table " + pair.right + " with a view.");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat!

spec:
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-primitive` (`KEY` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY', 'key.fields-prefix'='', 'key.type'='PRIMITIVE', 'partial-update-mode'='true', 'storeName'='test-store-primitive', 'value.fields-include'='EXCEPT_KEY')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@jogrogan jogrogan merged commit 689300b into main May 8, 2025
1 check passed
@jogrogan jogrogan deleted the jogrogan/filterRule branch May 8, 2025 14:08
ryannedolan pushed a commit that referenced this pull request May 13, 2025
#136)

* Add some missing error handling and tests around errors

* Add tests for venice primitive keys

* Pass through missing properties

* Fix issues with top-level filter passing through wrong fields to ScriptImplementor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants