|
1 | 1 | package sqlline;
|
2 | 2 |
|
| 3 | +import com.linkedin.hoptimator.Database; |
3 | 4 | import java.nio.charset.StandardCharsets;
|
4 | 5 | import java.sql.SQLException;
|
5 | 6 | import java.util.Arrays;
|
|
11 | 12 | import java.util.Properties;
|
12 | 13 | import java.util.Scanner;
|
13 | 14 |
|
| 15 | +import org.apache.calcite.jdbc.CalcitePrepare; |
| 16 | +import org.apache.calcite.jdbc.CalciteSchema; |
14 | 17 | import org.apache.calcite.plan.RelOptTable;
|
15 | 18 | import org.apache.calcite.rel.RelRoot;
|
| 19 | +import org.apache.calcite.rel.type.RelDataType; |
| 20 | +import org.apache.calcite.rel.type.RelDataTypeFactory; |
| 21 | +import org.apache.calcite.rel.type.RelDataTypeImpl; |
| 22 | +import org.apache.calcite.rel.type.RelDataTypeSystem; |
| 23 | +import org.apache.calcite.rel.type.RelProtoDataType; |
| 24 | +import org.apache.calcite.schema.SchemaPlus; |
| 25 | +import org.apache.calcite.schema.Table; |
| 26 | +import org.apache.calcite.sql.SqlCall; |
| 27 | +import org.apache.calcite.sql.SqlIdentifier; |
| 28 | +import org.apache.calcite.sql.SqlKind; |
| 29 | +import org.apache.calcite.sql.SqlNode; |
| 30 | +import org.apache.calcite.sql.SqlNodeList; |
| 31 | +import org.apache.calcite.sql.SqlSelect; |
| 32 | +import org.apache.calcite.sql.ddl.SqlCreateMaterializedView; |
| 33 | +import org.apache.calcite.sql.dialect.CalciteSqlDialect; |
| 34 | +import org.apache.calcite.sql.fun.SqlStdOperatorTable; |
| 35 | +import org.apache.calcite.sql.parser.SqlParserPos; |
| 36 | +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; |
| 37 | +import org.apache.calcite.util.Pair; |
| 38 | +import org.apache.calcite.util.Util; |
16 | 39 | import org.jline.reader.Completer;
|
17 | 40 |
|
18 | 41 | import com.linkedin.hoptimator.Pipeline;
|
@@ -96,13 +119,29 @@ public void execute(String line, DispatchCallback dispatchCallback) {
|
96 | 119 | String sql = split[1];
|
97 | 120 | HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection();
|
98 | 121 | try {
|
99 |
| - RelRoot root = HoptimatorDriver.convert(conn, sql).root; |
| 122 | + String querySql = sql; |
| 123 | + SqlCreateMaterializedView create = null; |
| 124 | + SqlNode sqlNode = HoptimatorDriver.parseQuery(conn, sql); |
| 125 | + if (sqlNode.getKind().belongsTo(SqlKind.DDL)) { |
| 126 | + if (sqlNode instanceof SqlCreateMaterializedView) { |
| 127 | + create = (SqlCreateMaterializedView) sqlNode; |
| 128 | + final SqlNode q = renameColumns(create.columnList, create.query); |
| 129 | + querySql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql(); |
| 130 | + } else { |
| 131 | + sqlline.error("Unsupported DDL statement: " + sql); |
| 132 | + dispatchCallback.setToFailure(); |
| 133 | + return; |
| 134 | + } |
| 135 | + } |
| 136 | + |
| 137 | + RelRoot root = HoptimatorDriver.convert(conn, querySql).root; |
100 | 138 | Properties connectionProperties = conn.connectionProperties();
|
101 | 139 | RelOptTable table = root.rel.getTable();
|
102 | 140 | if (table != null) {
|
103 | 141 | connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, String.join(".", table.getQualifiedName()));
|
104 | 142 | }
|
105 | 143 | PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations(), connectionProperties);
|
| 144 | + setSink(conn, plan, create, querySql); |
106 | 145 | sqlline.output(plan.sql(conn).apply(SqlDialect.ANSI));
|
107 | 146 | } catch (SQLException e) {
|
108 | 147 | sqlline.error(e);
|
@@ -237,14 +276,32 @@ public void execute(String line, DispatchCallback dispatchCallback) {
|
237 | 276 | }
|
238 | 277 | String sql = split[1];
|
239 | 278 | HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection();
|
240 |
| - RelRoot root = HoptimatorDriver.convert(conn, sql).root; |
241 | 279 | try {
|
| 280 | + String querySql = sql; |
| 281 | + SqlCreateMaterializedView create = null; |
| 282 | + SqlNode sqlNode = HoptimatorDriver.parseQuery(conn, sql); |
| 283 | + if (sqlNode.getKind().belongsTo(SqlKind.DDL)) { |
| 284 | + if (sqlNode instanceof SqlCreateMaterializedView) { |
| 285 | + create = (SqlCreateMaterializedView) sqlNode; |
| 286 | + final SqlNode q = renameColumns(create.columnList, create.query); |
| 287 | + querySql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql(); |
| 288 | + } else { |
| 289 | + sqlline.error("Unsupported DDL statement: " + sql); |
| 290 | + dispatchCallback.setToFailure(); |
| 291 | + return; |
| 292 | + } |
| 293 | + } |
| 294 | + |
| 295 | + RelRoot root = HoptimatorDriver.convert(conn, querySql).root; |
242 | 296 | Properties connectionProperties = conn.connectionProperties();
|
243 | 297 | RelOptTable table = root.rel.getTable();
|
244 | 298 | if (table != null) {
|
245 | 299 | connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, String.join(".", table.getQualifiedName()));
|
246 | 300 | }
|
247 |
| - Pipeline pipeline = DeploymentService.plan(root, conn.materializations(), connectionProperties).pipeline("sink", conn); |
| 301 | + PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations(), connectionProperties); |
| 302 | + setSink(conn, plan, create, querySql); |
| 303 | + String viewName = create == null ? "sink" : viewName(create.name); |
| 304 | + Pipeline pipeline = plan.pipeline(viewName, conn); |
248 | 305 | List<String> specs = new ArrayList<>();
|
249 | 306 | for (Source source : pipeline.sources()) {
|
250 | 307 | specs.addAll(DeploymentService.specify(source, conn));
|
@@ -325,4 +382,73 @@ public boolean echoToFile() {
|
325 | 382 | private static boolean startsWith(String s, String prefix) {
|
326 | 383 | return s.matches("(?i)" + prefix + ".*");
|
327 | 384 | }
|
| 385 | + |
| 386 | + private static void setSink(HoptimatorConnection conn, PipelineRel.Implementor plan, |
| 387 | + SqlCreateMaterializedView create, String querySql) throws SQLException { |
| 388 | + if (create == null) { |
| 389 | + return; |
| 390 | + } |
| 391 | + final Pair<CalciteSchema, String> pair = schema(conn.createPrepareContext(), create.name); |
| 392 | + String database = ((Database) pair.left.schema).databaseName(); |
| 393 | + final List<String> schemaPath = pair.left.path(null); |
| 394 | + List<String> sinkPath = new ArrayList<>(schemaPath); |
| 395 | + String sinkName = pair.right.split("\\$", 2)[0]; |
| 396 | + sinkPath.add(sinkName); |
| 397 | + |
| 398 | + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); |
| 399 | + CalcitePrepare.AnalyzeViewResult analyzed = HoptimatorDriver.analyzeView(conn, querySql); |
| 400 | + RelProtoDataType protoType = RelDataTypeImpl.proto(analyzed.rowType); |
| 401 | + RelDataType viewRowType = protoType.apply(typeFactory); |
| 402 | + |
| 403 | + final SchemaPlus schemaPlus = pair.left.plus(); |
| 404 | + Table sink = schemaPlus.getTable(sinkName); |
| 405 | + final RelDataType rowType; |
| 406 | + if (sink != null) { |
| 407 | + // For "partial views", the sink may already exist. Use the existing row type. |
| 408 | + rowType = sink.getRowType(typeFactory); |
| 409 | + } else { |
| 410 | + // For normal views, we create the sink based on the view row type. |
| 411 | + rowType = viewRowType; |
| 412 | + } |
| 413 | + |
| 414 | + plan.setSink(database, sinkPath, rowType, Collections.emptyMap()); |
| 415 | + } |
| 416 | + |
| 417 | + private static SqlNode renameColumns(SqlNodeList columnList, SqlNode query) { |
| 418 | + if (columnList == null) { |
| 419 | + return query; |
| 420 | + } |
| 421 | + final SqlParserPos p = query.getParserPosition(); |
| 422 | + final SqlNodeList selectList = SqlNodeList.SINGLETON_STAR; |
| 423 | + final SqlCall from = SqlStdOperatorTable.AS.createCall(p, |
| 424 | + Arrays.asList(query, new SqlIdentifier("_", p), columnList)); |
| 425 | + return new SqlSelect(p, null, selectList, from, null, null, null, null, null, null, null, null, null); |
| 426 | + } |
| 427 | + |
| 428 | + private static Pair<CalciteSchema, String> schema(CalcitePrepare.Context context, SqlIdentifier id) { |
| 429 | + final String name; |
| 430 | + final List<String> path; |
| 431 | + if (id.isSimple()) { |
| 432 | + path = context.getDefaultSchemaPath(); |
| 433 | + name = id.getSimple(); |
| 434 | + } else { |
| 435 | + path = Util.skipLast(id.names); |
| 436 | + name = Util.last(id.names); |
| 437 | + } |
| 438 | + CalciteSchema schema = context.getRootSchema(); |
| 439 | + for (String p : path) { |
| 440 | + schema = Objects.requireNonNull(schema).getSubSchema(p, true); |
| 441 | + } |
| 442 | + return Pair.of(schema, name); |
| 443 | + } |
| 444 | + |
| 445 | + private static String viewName(SqlIdentifier id) { |
| 446 | + final String name; |
| 447 | + if (id.isSimple()) { |
| 448 | + name = id.getSimple(); |
| 449 | + } else { |
| 450 | + name = Util.last(id.names); |
| 451 | + } |
| 452 | + return name; |
| 453 | + } |
328 | 454 | }
|
0 commit comments