Skip to content

Commit fd156b0

Browse files
authored
Resolve complex array types (#95)
* Add configs to status * Fix various checkstyle complaints * Resolve complex array types * Fix integration tests
1 parent f2e4444 commit fd156b0

File tree

71 files changed

+305
-190
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+305
-190
lines changed

hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -79,55 +79,65 @@ private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean
7979
}
8080

8181
public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
82+
return rel(schema, typeFactory, false);
83+
}
84+
85+
/** Converts Avro Schema to RelDataType.
86+
* Nullability is preserved except for array types, JDBC is incapable of interpreting e.g. "FLOAT NOT NULL ARRAY"
87+
* causing "NOT NULL" arrays to get demoted to "ANY ARRAY" which is not desired.
88+
*/
89+
public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean nullable) {
8290
RelDataType unknown = typeFactory.createUnknownType();
8391
switch (schema.getType()) {
8492
case RECORD:
8593
return typeFactory.createStructType(schema.getFields()
8694
.stream()
87-
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory)))
95+
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable)))
8896
.filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
8997
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
9098
.collect(Collectors.toList()));
9199
case INT:
92-
return createRelType(typeFactory, SqlTypeName.INTEGER);
100+
return createRelType(typeFactory, SqlTypeName.INTEGER, nullable);
93101
case LONG:
94-
return createRelType(typeFactory, SqlTypeName.BIGINT);
102+
return createRelType(typeFactory, SqlTypeName.BIGINT, nullable);
95103
case ENUM:
96104
case FIXED:
97105
case STRING:
98-
return createRelType(typeFactory, SqlTypeName.VARCHAR);
106+
return createRelType(typeFactory, SqlTypeName.VARCHAR, nullable);
107+
case BYTES:
108+
return createRelType(typeFactory, SqlTypeName.VARBINARY, nullable);
99109
case FLOAT:
100-
return createRelType(typeFactory, SqlTypeName.FLOAT);
110+
return createRelType(typeFactory, SqlTypeName.FLOAT, nullable);
101111
case DOUBLE:
102-
return createRelType(typeFactory, SqlTypeName.DOUBLE);
112+
return createRelType(typeFactory, SqlTypeName.DOUBLE, nullable);
103113
case BOOLEAN:
104-
return createRelType(typeFactory, SqlTypeName.BOOLEAN);
114+
return createRelType(typeFactory, SqlTypeName.BOOLEAN, nullable);
105115
case ARRAY:
106-
return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1);
116+
return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1);
107117
// TODO support map types
108118
// Appears to require a Calcite version bump
109119
// case MAP:
110-
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory));
120+
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable))
111121
case UNION:
112122
if (schema.isNullable() && schema.getTypes().size() == 2) {
113123
Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
114-
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory), true);
124+
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, true), true);
115125
} else {
116126
// TODO support more elaborate union types
117127
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
118128
}
119129
default:
120-
return typeFactory.createUnknownType();
130+
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), nullable);
121131
}
122132
}
123133

124134
public static RelDataType rel(Schema schema) {
125135
return rel(schema, DataType.DEFAULT_TYPE_FACTORY);
126136
}
127137

128-
private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) {
138+
private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) {
129139
RelDataType rawType = typeFactory.createSqlType(typeName);
130-
return typeFactory.createTypeWithNullability(rawType, false);
140+
return typeFactory.createTypeWithNullability(rawType, nullable);
131141
}
132142

133143
public static RelProtoDataType proto(Schema schema) {

hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import org.junit.Test;
88

99
import static org.junit.Assert.assertEquals;
10+
import static org.junit.Assert.assertFalse;
11+
import static org.junit.Assert.assertNotNull;
1012
import static org.junit.Assert.assertTrue;
1113

1214

@@ -22,18 +24,18 @@ public void convertsNestedSchemas() {
2224
Schema avroSchema1 = (new Schema.Parser()).parse(schemaString);
2325
RelDataType rel1 = AvroConverter.rel(avroSchema1);
2426
assertEquals(rel1.toString(), rel1.getFieldCount(), avroSchema1.getFields().size());
25-
assertTrue(rel1.toString(), rel1.getField("h", false, false) != null);
27+
assertNotNull(rel1.toString(), rel1.getField("h", false, false));
2628
RelDataType rel2 = rel1.getField("h", false, false).getType();
2729
assertTrue(rel2.toString(), rel2.isNullable());
2830
Schema avroSchema2 = avroSchema1.getField("h").schema().getTypes().get(1);
2931
assertEquals(rel2.toString(), rel2.getFieldCount(), avroSchema2.getFields().size());
30-
assertTrue(rel2.toString(), rel2.getField("A", false, false) != null);
32+
assertNotNull(rel2.toString(), rel2.getField("A", false, false));
3133
RelDataType rel3 = rel2.getField("A", false, false).getType();
3234
assertTrue(rel3.toString(), rel3.isNullable());
3335
Schema avroSchema3 = avroSchema2.getField("A").schema().getTypes().get(1);
3436
assertEquals(rel3.toString(), rel3.getFieldCount(), avroSchema3.getFields().size());
3537
Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1);
36-
assertTrue("!avroSchema4.isNullable()", !avroSchema4.isNullable());
38+
assertFalse("!avroSchema4.isNullable()", avroSchema4.isNullable());
3739
assertEquals(avroSchema4.toString(), avroSchema4.getFields().size(), rel1.getFieldCount());
3840
Schema avroSchema5 = AvroConverter.avro("NS", "R", rel2);
3941
assertTrue("avroSchema5.isNullable()", avroSchema5.isNullable());

hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/DataTypeTest.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import org.apache.calcite.rel.type.RelDataType;
44
import org.junit.Test;
55

6-
import static org.junit.Assert.assertTrue;
6+
import static org.junit.Assert.assertEquals;
7+
import static org.junit.Assert.assertNotNull;
8+
import static org.junit.Assert.assertNull;
79

810

911
public class DataTypeTest {
@@ -13,12 +15,12 @@ public void skipsNestedRows() {
1315
DataType.Struct struct =
1416
DataType.struct().with("one", DataType.VARCHAR).with("two", DataType.struct().with("three", DataType.VARCHAR));
1517
RelDataType row1 = struct.rel();
16-
assertTrue(row1.toString(), row1.getFieldCount() == 2);
17-
assertTrue(row1.toString(), row1.getField("one", false, false) != null);
18-
assertTrue(row1.toString(), row1.getField("two", false, false) != null);
18+
assertEquals(row1.toString(), 2, row1.getFieldCount());
19+
assertNotNull(row1.toString(), row1.getField("one", false, false));
20+
assertNotNull(row1.toString(), row1.getField("two", false, false));
1921
RelDataType row2 = struct.dropNestedRows().rel();
20-
assertTrue(row2.toString(), row2.getFieldCount() == 1);
21-
assertTrue(row2.toString(), row2.getField("one", false, false) != null);
22-
assertTrue(row2.toString(), row2.getField("two", false, false) == null);
22+
assertEquals(row2.toString(), 1, row2.getFieldCount());
23+
assertNotNull(row2.toString(), row2.getField("one", false, false));
24+
assertNull(row2.toString(), row2.getField("two", false, false));
2325
}
2426
}

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import java.time.Duration;
1010
import java.util.Optional;
1111

12+
import org.apache.commons.lang3.StringUtils;
13+
1214
import io.kubernetes.client.apimachinery.GroupVersion;
1315
import io.kubernetes.client.common.KubernetesListObject;
1416
import io.kubernetes.client.common.KubernetesObject;
@@ -19,7 +21,6 @@
1921
import io.kubernetes.client.util.KubeConfig;
2022
import io.kubernetes.client.util.generic.GenericKubernetesApi;
2123
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi;
22-
import org.apache.commons.lang3.StringUtils;
2324

2425

2526
public class K8sContext {

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngine.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
package com.linkedin.hoptimator.k8s;
22

3+
import java.util.Objects;
34
import javax.sql.DataSource;
45

6+
import org.apache.calcite.adapter.jdbc.JdbcSchema;
7+
58
import com.linkedin.hoptimator.Engine;
69
import com.linkedin.hoptimator.SqlDialect;
710

8-
import java.util.Objects;
9-
10-
import org.apache.calcite.adapter.jdbc.JdbcSchema;
11-
1211

1312
public class K8sEngine implements Engine {
1413

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngineTable.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22

33
import java.util.Arrays;
44
import java.util.List;
5-
import java.util.Locale;
65
import java.util.Optional;
76
import java.util.stream.Collectors;
87

9-
import org.apache.calcite.adapter.jdbc.JdbcSchema;
108
import org.apache.calcite.schema.Schema;
119

1210
import io.kubernetes.client.openapi.models.V1ObjectMeta;
@@ -16,7 +14,6 @@
1614
import com.linkedin.hoptimator.k8s.models.V1alpha1Engine;
1715
import com.linkedin.hoptimator.k8s.models.V1alpha1EngineList;
1816
import com.linkedin.hoptimator.k8s.models.V1alpha1EngineSpec;
19-
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema;
2017

2118

2219
public class K8sEngineTable extends K8sTable<V1alpha1Engine, V1alpha1EngineList, K8sEngineTable.Row> {
@@ -56,8 +53,8 @@ public List<Engine> forDatabase(String database) {
5653
public Row toRow(V1alpha1Engine obj) {
5754
return new Row(obj.getMetadata().getName(), obj.getSpec().getUrl(),
5855
Optional.ofNullable(obj.getSpec().getDialect()).map(x -> x.toString()).orElseGet(() -> null),
59-
obj.getSpec().getDriver(), obj.getSpec().getDatabases() != null ?
60-
obj.getSpec().getDatabases().toArray(new String[0]) : null);
56+
obj.getSpec().getDriver(), obj.getSpec().getDatabases() != null
57+
? obj.getSpec().getDatabases().toArray(new String[0]) : null);
6158
}
6259

6360
@Override

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,23 @@
22

33
import java.sql.SQLException;
44
import java.util.List;
5-
import java.util.Locale;
5+
import java.util.Properties;
66
import java.util.function.Function;
77
import java.util.stream.Collectors;
88

99
import com.linkedin.hoptimator.Job;
1010
import com.linkedin.hoptimator.SqlDialect;
1111
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate;
1212
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList;
13+
import com.linkedin.hoptimator.util.ConfigService;
1314
import com.linkedin.hoptimator.util.Template;
1415

1516

1617
/** Specifies an abstract Job with concrete YAML by applying JobTemplates. */
1718
class K8sJobDeployer extends K8sYamlDeployer<Job> {
1819

20+
private static final String FLINK_CONFIG = "flink.config";
21+
1922
private final K8sApi<V1alpha1JobTemplate, V1alpha1JobTemplateList> jobTemplateApi;
2023

2124
K8sJobDeployer(K8sContext context) {
@@ -25,6 +28,8 @@ class K8sJobDeployer extends K8sYamlDeployer<Job> {
2528

2629
@Override
2730
public List<String> specify(Job job) throws SQLException {
31+
Properties properties = ConfigService.config(null, false, FLINK_CONFIG);
32+
properties.putAll(job.sink().options());
2833
Function<SqlDialect, String> sql = job.sql();
2934
String name = K8sUtils.canonicalizeName(job.sink().database(), job.name());
3035
Template.Environment env = new Template.SimpleEnvironment()
@@ -34,6 +39,7 @@ public List<String> specify(Job job) throws SQLException {
3439
.with("table", job.sink().table())
3540
.with("sql", sql.apply(SqlDialect.ANSI))
3641
.with("flinksql", sql.apply(SqlDialect.FLINK))
42+
.with("flinkconfigs", properties)
3743
.with(job.sink().options());
3844
return jobTemplateApi.list()
3945
.stream()

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.sql.SQLException;
44
import java.util.List;
5-
import java.util.Locale;
65
import java.util.stream.Collectors;
76

87
import com.linkedin.hoptimator.Source;

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import java.util.Collection;
44
import java.util.Locale;
5-
import java.util.stream.Stream;
65
import java.util.stream.Collectors;
6+
import java.util.stream.Stream;
77

88
import io.kubernetes.client.common.KubernetesType;
99

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Database.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* Database metadata.
3131
*/
3232
@ApiModel(description = "Database metadata.")
33-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-20T21:16:25.561Z[Etc/UTC]")
33+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-21T21:11:41.897Z[Etc/UTC]")
3434
public class V1alpha1Database implements io.kubernetes.client.common.KubernetesObject {
3535
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3636
@SerializedName(SERIALIZED_NAME_API_VERSION)

0 commit comments

Comments
 (0)