Skip to content

Commit f2e4444

Browse files
authored
Unflatten JDBC sources before planning (#93)
Integration tests not expected to pass.
1 parent 089fdac commit f2e4444

File tree

8 files changed

+177
-98
lines changed

8 files changed

+177
-98
lines changed

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDatabaseTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import com.linkedin.hoptimator.k8s.models.V1alpha1Database;
1818
import com.linkedin.hoptimator.k8s.models.V1alpha1DatabaseList;
1919
import com.linkedin.hoptimator.k8s.models.V1alpha1DatabaseSpec;
20-
import com.linkedin.hoptimator.util.HoptimatorJdbcSchema;
20+
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema;
2121

2222

2323
public class K8sDatabaseTable extends K8sTable<V1alpha1Database, V1alpha1DatabaseList, K8sDatabaseTable.Row> {

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngineTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import com.linkedin.hoptimator.k8s.models.V1alpha1Engine;
1717
import com.linkedin.hoptimator.k8s.models.V1alpha1EngineList;
1818
import com.linkedin.hoptimator.k8s.models.V1alpha1EngineSpec;
19-
import com.linkedin.hoptimator.util.HoptimatorJdbcSchema;
19+
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema;
2020

2121

2222
public class K8sEngineTable extends K8sTable<V1alpha1Engine, V1alpha1EngineList, K8sEngineTable.Row> {

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java

Lines changed: 0 additions & 65 deletions
This file was deleted.
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.linkedin.hoptimator.util.planner;
2+
3+
import java.util.List;
4+
import javax.sql.DataSource;
5+
6+
import org.apache.calcite.adapter.jdbc.JdbcSchema;
7+
import org.apache.calcite.adapter.jdbc.JdbcTable;
8+
import org.apache.calcite.linq4j.tree.Expression;
9+
import org.apache.calcite.schema.Schema;
10+
import org.apache.calcite.schema.Schemas;
11+
import org.apache.calcite.schema.SchemaPlus;
12+
import org.apache.calcite.schema.SchemaVersion;
13+
import org.apache.calcite.schema.Table;
14+
import org.apache.calcite.sql.SqlDialect;
15+
import org.apache.calcite.sql.SqlDialectFactory;
16+
import org.apache.calcite.sql.SqlDialectFactoryImpl;
17+
18+
import com.linkedin.hoptimator.Database;
19+
import com.linkedin.hoptimator.Engine;
20+
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcConvention;
21+
22+
23+
public class HoptimatorJdbcSchema extends JdbcSchema implements Database {
24+
25+
private final String database;
26+
private final List<Engine> engines;
27+
private final HoptimatorJdbcConvention convention;
28+
29+
public static HoptimatorJdbcSchema create(String database, String schema, DataSource dataSource,
30+
SchemaPlus parentSchema, SqlDialect dialect, List<Engine> engines) {
31+
Expression expression = Schemas.subSchemaExpression(parentSchema, schema, HoptimatorJdbcSchema.class);
32+
HoptimatorJdbcConvention convention = new HoptimatorJdbcConvention(dialect, expression, database, engines);
33+
return new HoptimatorJdbcSchema(database, schema, dataSource, dialect, convention, engines);
34+
}
35+
36+
public HoptimatorJdbcSchema(String database, String schema, DataSource dataSource, SqlDialect dialect,
37+
HoptimatorJdbcConvention convention, List<Engine> engines) {
38+
super(dataSource, dialect, convention, null, schema);
39+
this.database = database;
40+
this.engines = engines;
41+
this.convention = convention;
42+
}
43+
44+
@Override
45+
public String databaseName() {
46+
return database;
47+
}
48+
49+
public List<Engine> engines() {
50+
return engines;
51+
}
52+
53+
@Override
54+
public Table getTable(String name) {
55+
return new HoptimatorJdbcTable((JdbcTable) super.getTable(name), convention);
56+
}
57+
58+
@Override
59+
public Schema snapshot(SchemaVersion version) {
60+
return this;
61+
}
62+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package com.linkedin.hoptimator.util.planner;
2+
3+
import java.util.Collection;
4+
import java.util.List;
5+
6+
import com.linkedin.hoptimator.util.DataTypeUtils;
7+
8+
import org.apache.calcite.adapter.java.AbstractQueryableTable;
9+
import org.apache.calcite.adapter.java.JavaTypeFactory;
10+
import org.apache.calcite.adapter.jdbc.JdbcTable;
11+
import org.apache.calcite.adapter.jdbc.JdbcTableScan;
12+
import org.apache.calcite.linq4j.QueryProvider;
13+
import org.apache.calcite.linq4j.Queryable;
14+
import org.apache.calcite.plan.RelOptCluster;
15+
import org.apache.calcite.plan.RelOptTable;
16+
import org.apache.calcite.prepare.Prepare.CatalogReader;
17+
import org.apache.calcite.rel.RelNode;
18+
import org.apache.calcite.rel.core.TableModify;
19+
import org.apache.calcite.rel.type.RelDataType;
20+
import org.apache.calcite.rel.type.RelDataTypeFactory;
21+
import org.apache.calcite.rex.RexNode;
22+
import org.apache.calcite.schema.ModifiableTable;
23+
import org.apache.calcite.schema.Table;
24+
import org.apache.calcite.schema.SchemaPlus;
25+
import org.apache.calcite.schema.TranslatableTable;
26+
import org.apache.calcite.schema.impl.AbstractTableQueryable;
27+
28+
29+
public class HoptimatorJdbcTable extends AbstractQueryableTable implements TranslatableTable,
30+
ModifiableTable {
31+
32+
private final JdbcTable jdbcTable;
33+
private final HoptimatorJdbcConvention convention;
34+
35+
public HoptimatorJdbcTable(JdbcTable jdbcTable, HoptimatorJdbcConvention convention) {
36+
super(Object[].class);
37+
this.jdbcTable = jdbcTable;
38+
this.convention = convention;
39+
}
40+
41+
public JdbcTable jdbcTable() {
42+
return jdbcTable;
43+
}
44+
45+
@Override
46+
public RelDataType getRowType(RelDataTypeFactory factory) {
47+
return DataTypeUtils.unflatten(jdbcTable.getRowType(factory), factory);
48+
}
49+
50+
@Override
51+
public RelNode toRel(RelOptTable.ToRelContext context,
52+
RelOptTable relOptTable) {
53+
return new HoptimatorJdbcTableScan(context.getCluster(), context.getTableHints(), relOptTable, this,
54+
convention);
55+
}
56+
57+
@Override
58+
public TableModify toModificationRel(RelOptCluster cluster,
59+
RelOptTable table, CatalogReader catalogReader, RelNode input,
60+
TableModify.Operation operation, List<String> updateColumnList,
61+
List<RexNode> sourceExpressionList, boolean flattened) {
62+
return jdbcTable.toModificationRel(cluster, table, catalogReader, input, operation,
63+
updateColumnList, sourceExpressionList, flattened);
64+
}
65+
66+
@Override
67+
public Collection getModifiableCollection() {
68+
return null;
69+
}
70+
71+
@Override
72+
public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
73+
SchemaPlus schema, String tableName) {
74+
return jdbcTable.asQueryable(queryProvider, schema, tableName);
75+
}
76+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.linkedin.hoptimator.util.planner;
2+
3+
import org.apache.calcite.adapter.jdbc.JdbcTable;
4+
import org.apache.calcite.adapter.jdbc.JdbcTableScan;
5+
import org.apache.calcite.plan.Convention;
6+
import org.apache.calcite.plan.RelOptCluster;
7+
import org.apache.calcite.plan.RelOptTable;
8+
import org.apache.calcite.plan.RelTraitSet;
9+
import org.apache.calcite.rel.RelNode;
10+
import org.apache.calcite.rel.core.TableScan;
11+
import org.apache.calcite.rel.hint.RelHint;
12+
13+
import com.google.common.collect.ImmutableList;
14+
15+
import java.util.List;
16+
17+
import static org.apache.calcite.linq4j.Nullness.castNonNull;
18+
19+
20+
public class HoptimatorJdbcTableScan extends JdbcTableScan {
21+
public final HoptimatorJdbcTable jdbcTable;
22+
23+
protected HoptimatorJdbcTableScan(
24+
RelOptCluster cluster,
25+
List<RelHint> hints,
26+
RelOptTable table,
27+
HoptimatorJdbcTable jdbcTable,
28+
HoptimatorJdbcConvention jdbcConvention) {
29+
super(cluster, hints, table, jdbcTable.jdbcTable(), jdbcConvention);
30+
this.jdbcTable = jdbcTable;
31+
}
32+
}

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ static class PipelineTableScan extends TableScan implements PipelineRel {
8989
@Override
9090
public void implement(Implementor implementor) throws SQLException {
9191
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
92-
implementor.addSource(database, table.getQualifiedName(),
93-
DataTypeUtils.unflatten(table.getRowType(), typeFactory),
92+
implementor.addSource(database, table.getQualifiedName(), table.getRowType(),
9493
Collections.emptyMap()); // TODO pass in table scan hints
9594
}
9695
}

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,12 @@ public QueryImplementor(RelNode relNode) {
166166
public void implement(SqlWriter w) {
167167
RelToSqlConverter converter = new RelToSqlConverter(w.getDialect());
168168
SqlImplementor.Result result = converter.visitRoot(relNode);
169-
SqlSelect select = result.asSelect();
170-
if (select.getSelectList() != null) {
169+
SqlNode node = result.asStatement();
170+
if (node instanceof SqlSelect && ((SqlSelect) node).getSelectList() != null) {
171+
SqlSelect select = (SqlSelect) node;
171172
select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR));
172173
}
173-
select.accept(new UnflattenMemberAccess(this)).unparse(w, 0, 0);
174+
node.unparse(w, 0, 0);
174175
}
175176

176177
// A `ROW(...)` operator which will unparse as just `(...)`.
@@ -189,32 +190,6 @@ public SqlNode visit(SqlCall call) {
189190
}
190191
}
191192
};
192-
193-
private static class UnflattenMemberAccess extends SqlShuttle {
194-
private final Set<String> sinkFieldList;
195-
196-
UnflattenMemberAccess(QueryImplementor outer) {
197-
this.sinkFieldList = outer.relNode.getRowType().getFieldList()
198-
.stream()
199-
.map(RelDataTypeField::getName)
200-
.collect(Collectors.toSet());
201-
}
202-
203-
// SqlShuttle gets called for every field in SELECT and every table name in FROM alike
204-
// For fields in SELECT, we want to unflatten them as `FOO_BAR`, for tables `FOO.BAR`
205-
@Override
206-
public SqlNode visit(SqlIdentifier id) {
207-
if (id.names.size() == 1 && sinkFieldList.contains(id.names.get(0))) {
208-
id.assignNamesFrom(new SqlIdentifier(id.names.get(0).replaceAll("\\$", "_"), SqlParserPos.ZERO));
209-
} else {
210-
SqlIdentifier replacement = new SqlIdentifier(id.names.stream()
211-
.flatMap(x -> Stream.of(x.split("\\$")))
212-
.collect(Collectors.toList()), SqlParserPos.ZERO);
213-
id.assignNamesFrom(replacement);
214-
}
215-
return id;
216-
}
217-
}
218193
}
219194

220195
/**

0 commit comments

Comments
 (0)