diff --git a/Makefile b/Makefile index 7e596ac0..8b75d02f 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ build: docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-runner -t hoptimator-flink-runner docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-operator -t hoptimator-flink-operator -bounce: build undeploy deploy deploy-samples deploy-config +bounce: build undeploy deploy clean: ./gradlew clean @@ -26,6 +26,10 @@ deploy: deploy-config kubectl apply -f ./hoptimator-k8s/src/main/resources/ kubectl apply -f ./deploy kubectl apply -f ./deploy/dev/rbac.yaml + kubectl wait --for=condition=Established=True \ + crds/subscriptions.hoptimator.linkedin.com \ + crds/kafkatopics.hoptimator.linkedin.com \ + crds/sqljobs.hoptimator.linkedin.com undeploy: undeploy-config kubectl delete -f ./deploy/dev/rbac.yaml || echo "skipping" @@ -45,9 +49,9 @@ deploy-flink: deploy kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.11.0/ helm upgrade --install --atomic --set webhook.create=false,image.pullPolicy=Never,image.repository=docker.io/library/hoptimator-flink-operator,image.tag=latest --set-json='watchNamespaces=["default","flink"]' flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator - kubectl apply -f deploy/dev/flink-session-cluster.yaml - kubectl apply -f deploy/dev/flink-sql-gateway.yaml - kubectl apply -f deploy/samples/flink-template.yaml + kubectl apply -f ./deploy/dev/flink-session-cluster.yaml + kubectl apply -f ./deploy/dev/flink-sql-gateway.yaml + kubectl apply -f ./deploy/samples/flink-template.yaml undeploy-flink: kubectl delete flinksessionjobs.flink.apache.org --all || echo "skipping" @@ -61,8 +65,12 @@ undeploy-flink: deploy-kafka: deploy deploy-flink kubectl create namespace kafka || echo "skipping" kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka - kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io kubectl apply -f ./deploy/samples/kafkadb.yaml + kubectl apply -f ./deploy/dev/kafka.yaml + kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io + kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka + kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m + kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m undeploy-kafka: kubectl delete kafkatopic.kafka.strimzi.io --all || echo "skipping" @@ -84,20 +92,12 @@ undeploy-venice: docker compose -f ./deploy/docker/venice/docker-compose-single-dc-setup.yaml down deploy-dev-environment: deploy deploy-demo deploy-flink deploy-kafka deploy-venice - kubectl wait --for=condition=Established=True \ - crds/subscriptions.hoptimator.linkedin.com \ - crds/kafkatopics.hoptimator.linkedin.com \ - crds/sqljobs.hoptimator.linkedin.com - kubectl apply -f ./deploy/dev/ undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy-demo undeploy kubectl delete -f ./deploy/dev || echo "skipping" # Integration test setup intended to be run locally integration-tests: deploy-dev-environment - kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka - kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m - kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & echo $$! > port-forward.pid kubectl port-forward -n flink svc/flink-sql-gateway 8083 & echo $$! > port-forward-2.pid kubectl port-forward -n flink svc/basic-session-deployment-rest 8081 & echo $$! > port-forward-3.pid @@ -108,9 +108,6 @@ integration-tests: deploy-dev-environment # kind cluster used in github workflow needs to have different routing set up, avoiding the need to forward kafka ports integration-tests-kind: deploy-dev-environment - kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka - kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m - kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m ./gradlew intTest -i --no-parallel generate-models: diff --git a/README.md b/README.md index 809e16bb..3a9b5486 100644 --- a/README.md +++ b/README.md @@ -76,8 +76,8 @@ Commands `deploy-kafka`, `deploy-venice`, `deploy-flink`, etc. exist in isolatio To produce/consume Kafka data, use the following commands: ``` - $ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 - $ kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 --from-beginning + $ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.46.0-kafka-4.0.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 + $ kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.46.0-kafka-4.0.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 --from-beginning ``` ### Flink diff --git a/deploy/dev/kafka.yaml b/deploy/dev/kafka.yaml index c0c2d375..05b9dbc5 100644 --- a/deploy/dev/kafka.yaml +++ b/deploy/dev/kafka.yaml @@ -15,15 +15,56 @@ # Based on examples at: # https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/kafka +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaNodePool +metadata: + name: controller + namespace: kafka + labels: + strimzi.io/cluster: one +spec: + replicas: 3 + roles: + - controller + storage: + type: jbod + volumes: + - id: 0 + type: ephemeral + kraftMetadata: shared +--- + +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaNodePool +metadata: + name: broker + namespace: kafka + labels: + strimzi.io/cluster: one +spec: + replicas: 3 + roles: + - broker + storage: + type: jbod + volumes: + - id: 0 + type: ephemeral + kraftMetadata: shared +--- apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: one namespace: kafka + annotations: + strimzi.io/node-pools: enabled + strimzi.io/kraft: enabled spec: kafka: - version: 3.8.0 + version: 4.0.0 + metadataVersion: "4.0" replicas: 1 listeners: - name: plain @@ -54,16 +95,7 @@ spec: transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 - inter.broker.protocol.version: "3.8" allow.everyone.if.no.acl.found: true - storage: - type: ephemeral - authorization: - type: simple - zookeeper: - replicas: 3 - storage: - type: ephemeral entityOperator: topicOperator: watchedNamespace: default diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java index a9890b81..33c97f2e 100644 --- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java @@ -39,7 +39,7 @@ public List path() { } protected String pathString() { - return path.stream().collect(Collectors.joining(".")); + return String.join(".", path); } @Override diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java index d2473fa0..ace9b32c 100644 --- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java @@ -105,7 +105,7 @@ public boolean valid() { /** For convenience only, enabling try-with-resources */ public void close() { closed = true; - children.values().forEach(x -> x.checkClosed()); + children.values().forEach(Issues::checkClosed); } private void emit(String message) { @@ -130,7 +130,7 @@ private void checkClosed() { } private boolean empty() { - return issues.isEmpty() && children.values().stream().allMatch(x -> x.empty()); + return issues.isEmpty() && children.values().stream().allMatch(Issues::empty); } private String fullPath() { diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/View.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/View.java index 7b23cefd..2580bf34 100644 --- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/View.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/View.java @@ -32,7 +32,7 @@ public List path() { } protected String pathString() { - return path.stream().collect(Collectors.joining(".")); + return String.join(".", path); } @Override diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java index f9c557fd..47871e23 100644 --- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java +++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java @@ -30,19 +30,17 @@ public static Schema avro(String namespace, String name, RelDataType dataType) { } else { switch (dataType.getSqlTypeName()) { case INTEGER: - return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable()); case SMALLINT: return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable()); case BIGINT: return createAvroTypeWithNullability(Schema.Type.LONG, dataType.isNullable()); case VARCHAR: + case CHAR: return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable()); case FLOAT: return createAvroTypeWithNullability(Schema.Type.FLOAT, dataType.isNullable()); case DOUBLE: return createAvroTypeWithNullability(Schema.Type.DOUBLE, dataType.isNullable()); - case CHAR: - return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable()); case BOOLEAN: return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable()); case ARRAY: diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java index 58725dd3..e88c05db 100644 --- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java +++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java @@ -24,7 +24,7 @@ class AvroTableValidator implements Validator { private final SchemaPlus schema; - public AvroTableValidator(SchemaPlus schema) { + AvroTableValidator(SchemaPlus schema) { this.schema = schema; } diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroValidatorProvider.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroValidatorProvider.java index 099e8c21..d3953910 100644 --- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroValidatorProvider.java +++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroValidatorProvider.java @@ -12,7 +12,6 @@ /** Provides AvroValidator. */ public class AvroValidatorProvider implements ValidatorProvider { - @SuppressWarnings("unchecked") @Override public Collection validators(T obj) { if (obj instanceof SchemaPlus) { diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ConfigProvider.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ConfigProvider.java index fd1459e7..229a10ff 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ConfigProvider.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ConfigProvider.java @@ -22,7 +22,7 @@ static ConfigProvider from(Map configs) { if (configs == null) { return empty(); } else { - return x -> configs.entrySet().stream().collect(Collectors.toMap(y -> y.getKey(), y -> y.getValue().toString())); + return x -> configs.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, y -> y.getValue().toString())); } } @@ -32,8 +32,7 @@ default ConfigProvider with(String key, Function valueFunction) if (base.containsKey(key)) { throw new IllegalStateException("Key '" + key + "' previously defined."); } - Map combined = new HashMap<>(); - combined.putAll(base); + Map combined = new HashMap<>(base); combined.put(key, valueFunction.apply(x)); return combined; }; @@ -45,8 +44,7 @@ default ConfigProvider with(Map configs) { } return x -> { Map base = config(x); - Map combined = new HashMap<>(); - combined.putAll(base); + Map combined = new HashMap<>(base); configs.forEach((k, v) -> { if (base.containsKey(k)) { throw new IllegalStateException("Key '" + k + "' previously defined."); @@ -66,6 +64,6 @@ default ConfigProvider with(String key, Integer value) { } default ConfigProvider withPrefix(String prefix) { - return x -> config(x).entrySet().stream().collect(Collectors.toMap(y -> prefix + y.getKey(), y -> y.getValue())); + return x -> config(x).entrySet().stream().collect(Collectors.toMap(y -> prefix + y.getKey(), Map.Entry::getValue)); } } diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java index 67a9106c..b86ea582 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java @@ -12,7 +12,7 @@ import org.apache.calcite.sql.type.SqlTypeName; -/** Common data types. Not authoratitive or exhaustive. */ +/** Common data types. Not authoritative or exhaustive. */ public enum DataType { VARCHAR(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)), diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopRel.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopRel.java index 103a4314..10ea89e8 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopRel.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopRel.java @@ -6,7 +6,7 @@ /** * Calling convention that ultimately gets converted to a Pipeline or similar. - * + *

* For now, Hoptimator only implements Pipelines (via PipelineRel), which run on * Flink. Eventually, PipelineRel may support additional runtimes (e.g. Spark), * and/or Hoptimator may support additional calling conventions (e.g. batch jobs diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java index 47f12904..7126fb89 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTable.java @@ -17,7 +17,7 @@ /** * HopTables can have "baggage", including Resources and arbitrary DDL/SQL. - * + *

* This mechanism is extremely powerful. In addition to enabling views, we can * bring along arbitrary infra required to materialize a view. For example, a * table can bring along a CDC stream or a cache. Generally, such Resources diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTableScan.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTableScan.java index 939d9b80..4543524f 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTableScan.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/HopTableScan.java @@ -20,7 +20,7 @@ public final class HopTableScan extends TableScan implements HopRel { @Override public void register(RelOptPlanner planner) { planner.addRule(HopTableScanRule.INSTANCE); - RuleService.rules().forEach(x -> planner.addRule(x)); + RuleService.rules().forEach(planner::addRule); } } diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java index 5a1cb5d4..49158826 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java @@ -26,16 +26,16 @@ /** * Represents something required by a Table. - * + *

* In Hoptimator, Tables can come with "baggage" in the form of Resources, * which are essentially YAML files. Each Resource is rendered into YAML * with a specific Template. Thus, it's possible to represent Kafka topics, * Brooklin datastreams, Flink jobs, and so on, as part of a Table, just * by including a corresponding Resource. - * + *

* Resources are injected into a Pipeline by the planner as needed. Generally, * each Resource Template corresponds to a Kubernetes controller. - * + *

* Resources may optionally link to input Resources, which is used strictly * for informational/debugging purposes. */ @@ -48,7 +48,7 @@ public abstract class Resource { /** A Resource which should be rendered with the given template */ public Resource(String template) { this.template = template; - export("id", () -> id()); + export("id", this::id); } /** Copy constructor */ @@ -86,7 +86,7 @@ protected void export(String key, Map values) { /** Export a list of values */ protected void export(String key, List values) { - export(key, values.stream().collect(Collectors.joining("\n"))); + export(key, String.join("\n", values)); } /** Reference an input resource */ @@ -132,7 +132,7 @@ public int hashCode() { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("[ template: " + template() + " "); + sb.append("[ template: ").append(template()).append(" "); for (Map.Entry> entry : properties.entrySet()) { if (entry.getKey().equals("id")) { // special case for "id" to avoid recursion @@ -199,8 +199,7 @@ protected void exportAll(Map properties) { } public SimpleEnvironment with(String key, String value) { - Map newVars = new HashMap<>(); - newVars.putAll(vars); + Map newVars = new HashMap<>(vars); newVars.put(key, value); return new SimpleEnvironment() {{ exportAll(newVars); @@ -258,41 +257,41 @@ public interface Template { /** * Replaces `{{var}}` in a template file with the corresponding variable. - * + *

* Resource-scoped variables take precedence over Environment-scoped * variables. - * - * Default values can supplied with `{{var:default}}`. - * + *

+ * Default values can be supplied with `{{var:default}}`. + *

* Built-in transformations can be applied to variables, including: - * + *

* - `{{var toName}}`, `{{var:default toName}}`: canonicalize the * variable as a valid K8s object name. * - `{{var toUpperCase}}`, `{{var:default toUpperCase}}`: render in * all upper case. * - `{{var toLowerCase}}`, `{{var:default toLowerCase}}`: render in * all lower case. - * - `{{var concat}}`, `{{var:default concat}}`: concatinate a multiline + * - `{{var concat}}`, `{{var:default concat}}`: concatenate a multiline * string into one line * - `{{var concat toUpperCase}}`: apply both transformations in sequence. - * + *

* If `var` contains multiple lines, the behavior depends on context; * specifically, whether the pattern appears within a list or comment * (prefixed with `-` or `#`). For example, if the template includes: - * + *

* - {{var}} - * + *

* ...and `var` contains multiple lines, then the output will be: - * + *

* - value line 1 * - value line 2 - * + *

* To avoid this behavior (and just get a multiline string), use one of * YAML's multiline markers, e.g. - * + *

* - | * {{var}} - * + *

* In either case, the multiline string will be properly indented. */ public static class SimpleTemplate implements Template { diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ResourceProvider.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ResourceProvider.java index 964b247c..bf882163 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ResourceProvider.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ResourceProvider.java @@ -10,7 +10,7 @@ /** * Enables an adapter to emit arbitrary Resources for a given table. - * + *

* Optionally, establishes source->sink relationships between such Resources. These are used * strictly for debugging purposes. */ @@ -31,10 +31,10 @@ default Collection writeResources(String tableName) { /** * Establishes a source->sink relationship between ResourceProviders. - * + *

* All leaf-node Resources provided by this ResourceProvider will become sources. All nodes * provided by the given ResourceProvider will be sinks. - * + *

* e.g. *

    *   ResourceProvider.empty().with(x -> a).with(x -> b).to(x -> c).to(x -> d)
@@ -49,11 +49,8 @@ default Collection writeResources(String tableName) {
    */
   default ResourceProvider toAll(ResourceProvider sink) {
     return x -> {
-      List combined = new ArrayList<>();
-      List sources = new ArrayList<>();
-      List sinks = new ArrayList<>();
-      sources.addAll(resources(x));
-      combined.addAll(sources);
+      List sources = new ArrayList<>(resources(x));
+      List combined = new ArrayList<>(sources);
 
       // remove all non-leaf-node upstream Resources
       sources.removeAll(sources.stream().flatMap(y -> y.inputs().stream()).collect(Collectors.toList()));
@@ -64,13 +61,11 @@ default ResourceProvider toAll(ResourceProvider sink) {
           .collect(Collectors.toList()));
 
       // link all sources to all sinks
-      sink.resources(x).forEach(y -> {
-        combined.add(new Resource(y) {{
-          if (!(y instanceof ReadResource || y instanceof WriteResource)) {
-            sources.forEach(z -> input(z));
-          }
-        }});
-      });
+      sink.resources(x).forEach(y -> combined.add(new Resource(y) {{
+        if (!(y instanceof ReadResource || y instanceof WriteResource)) {
+          sources.forEach(this::input);
+        }
+      }}));
 
       return combined;
     };
@@ -112,7 +107,7 @@ default ResourceProvider readWithAll(ResourceProvider readResourceProvider) {
       List combined = new ArrayList<>();
       combined.addAll(resources(x));
       combined.addAll(
-          readResourceProvider.resources(x).stream().map(y -> new ReadResource(y)).collect(Collectors.toList()));
+          readResourceProvider.resources(x).stream().map(ReadResource::new).collect(Collectors.toList()));
       return combined;
     };
   }
@@ -123,7 +118,7 @@ default ResourceProvider writeWithAll(ResourceProvider writeResourceProvider) {
       List combined = new ArrayList<>();
       combined.addAll(resources(x));
       combined.addAll(
-          writeResourceProvider.resources(x).stream().map(y -> new WriteResource(y)).collect(Collectors.toList()));
+          writeResourceProvider.resources(x).stream().map(WriteResource::new).collect(Collectors.toList()));
       return combined;
     };
   }
diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/RuleService.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/RuleService.java
index 492416c5..28dffc84 100644
--- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/RuleService.java
+++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/RuleService.java
@@ -17,7 +17,7 @@ private RuleService() {
   public static Collection providers() {
     ServiceLoader loader = ServiceLoader.load(RuleProvider.class);
     List providers = new ArrayList<>();
-    loader.iterator().forEachRemaining(x -> providers.add(x));
+    loader.iterator().forEachRemaining(providers::add);
     return providers;
   }
 
diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java
index 5142bad9..939aed50 100644
--- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java
+++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java
@@ -24,6 +24,7 @@
 import org.apache.calcite.sql.SqlRowTypeNameSpec;
 import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.SqlWriterConfig;
 import org.apache.calcite.sql.dialect.AnsiSqlDialect;
 import org.apache.calcite.sql.fun.SqlRowOperator;
 import org.apache.calcite.sql.parser.SqlParserPos;
@@ -34,24 +35,24 @@
 
 /**
  * An abstract way to write SQL scripts.
- *
+ * 

* This enables Adapters to implement themselves without being tied to a * specific compute engine or SQL dialect. - * + *

* To generate a specific statement, implement this interface, or use one * of the provided implementations, e.g. `QueryImplementor`. - * + *

* To generate a script (more than one statement), start with `empty()` * and append subsequent ScriptImplementors with `with(...)` etc. - * + *

* e.g. - * + *

* ScriptImplementor.empty() * .database(db) * .connector(db, name, rowType, configs) - * + *

* ... would produce something like - * + *

* CREATE DATABASE IF NOT EXIST `FOO`; * CREATE TABLE `BAR` (NAME VARCHAR) WITH ('key1' = 'value1'); */ @@ -102,10 +103,7 @@ default String sql() { /** Render the script as DDL/SQL in the given dialect */ default String sql(SqlDialect dialect) { - SqlWriter w = new SqlPrettyWriter(dialect); -// TODO: fix in next Calcite version -// above is deprecated; replace with: -// SqlWriter w = new SqlPrettyWriter(SqlWriterConfig.of().withDialect(dialect)); + SqlWriter w = new SqlPrettyWriter(SqlWriterConfig.of().withDialect(dialect)); implement(w); return w.toSqlString().getSql().replaceAll("\\n", " ").replaceAll(";", ";\n").trim(); } @@ -165,7 +163,7 @@ public SqlNode visit(SqlCall call) { /** * Implements a CREATE TABLE...WITH... DDL statement. - * + *

* N.B. the following magic: * - field 'PRIMARY_KEY' is treated as a PRIMARY KEY * - NULL fields are promoted to BYTES @@ -227,10 +225,10 @@ public void implement(SqlWriter w) { } /** Implements an INSERT INTO statement. - * + *

* N.B. the following magic: * - NULL columns (e.g. `NULL AS KEY`) are elided from the pipeline - * + *

* */ class InsertImplementor implements ScriptImplementor { private final String database; @@ -305,7 +303,7 @@ public void implement(SqlWriter w) { } /** Implements row type specs, e.g. `NAME VARCHAR(20), AGE INTEGER`. - * + *

* N.B. the following magic: * - NULL fields are promoted to BYTES */ @@ -320,11 +318,11 @@ public RowTypeSpecImplementor(RelDataType dataType) { public void implement(SqlWriter w) { List fieldNames = dataType.getFieldList() .stream() - .map(x -> x.getName()) + .map(RelDataTypeField::getName) .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) .collect(Collectors.toList()); List fieldTypes = - dataType.getFieldList().stream().map(x -> x.getType()).map(x -> toSpec(x)).collect(Collectors.toList()); + dataType.getFieldList().stream().map(RelDataTypeField::getType).map(RowTypeSpecImplementor::toSpec).collect(Collectors.toList()); for (int i = 0; i < fieldNames.size(); i++) { w.sep(","); fieldNames.get(i).unparse(w, 0, 0); @@ -340,11 +338,11 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) { if (dataType.isStruct()) { List fieldNames = dataType.getFieldList() .stream() - .map(x -> x.getName()) + .map(RelDataTypeField::getName) .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) .collect(Collectors.toList()); List fieldTypes = - dataType.getFieldList().stream().map(x -> x.getType()).map(x -> toSpec(x)).collect(Collectors.toList()); + dataType.getFieldList().stream().map(RelDataTypeField::getType).map(RowTypeSpecImplementor::toSpec).collect(Collectors.toList()); return maybeNullable(dataType, new SqlDataTypeSpec(new SqlRowTypeNameSpec(SqlParserPos.ZERO, fieldNames, fieldTypes), SqlParserPos.ZERO)); } @@ -384,12 +382,12 @@ public ColumnListImplementor(List fields) { @Override public void implement(SqlWriter w) { List fieldNames = fields.stream() - .map(x -> x.getName()) + .map(RelDataTypeField::getName) .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) .collect(Collectors.toList()); - for (int i = 0; i < fieldNames.size(); i++) { + for (SqlIdentifier fieldName : fieldNames) { w.sep(","); - fieldNames.get(i).unparse(w, 0, 0); + fieldName.unparse(w, 0, 0); } } } diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableResolver.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableResolver.java index 72a74b1c..b6c95c81 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableResolver.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableResolver.java @@ -12,7 +12,7 @@ public interface TableResolver { RelDataType resolve(String table) throws InterruptedException, ExecutionException; static TableResolver from(Function f) { - return x -> f.apply(x); + return f::apply; } /** Appends an extra column to the resolved type */ diff --git a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java index 913d4da8..df2e25d0 100644 --- a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java +++ b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java @@ -34,8 +34,7 @@ public String getInfoMessage() { } public Collection getCommandHandlers(SqlLine sqlline) { - Collection list = new ArrayList<>(); - list.addAll(super.getCommandHandlers(sqlline)); + Collection list = new ArrayList<>(super.getCommandHandlers(sqlline)); list.add(new IntroCommandHandler(sqlline)); list.add(new PipelineCommandHandler(sqlline)); list.add(new SpecifyCommandHandler(sqlline)); @@ -172,7 +171,6 @@ public void execute(String line, DispatchCallback dispatchCallback) { } catch (SQLException e) { sqlline.error(e); dispatchCallback.setToFailure(); - return; } } diff --git a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkStreamingSqlJobReconciler.java b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkStreamingSqlJobReconciler.java index 1a1087ee..7cf4f250 100644 --- a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkStreamingSqlJobReconciler.java +++ b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkStreamingSqlJobReconciler.java @@ -24,8 +24,8 @@ * */ public class FlinkStreamingSqlJobReconciler implements Reconciler { - private final static Logger log = LoggerFactory.getLogger(FlinkStreamingSqlJobReconciler.class); - private final static String SQLJOB = "hoptimator.linkedin.com/v1alpha1/SqlJob"; + private static final Logger log = LoggerFactory.getLogger(FlinkStreamingSqlJobReconciler.class); + private static final String SQLJOB = "hoptimator.linkedin.com/v1alpha1/SqlJob"; private final Operator operator; diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CatalogService.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CatalogService.java index bcfb7849..bd5d5292 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CatalogService.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CatalogService.java @@ -17,12 +17,12 @@ private CatalogService() { } // This bit of magic means we can trivially implement a RemoteTable: - public static final Api API = () -> catalogs(); + public static final Api API = CatalogService::catalogs; public static Collection providers() { ServiceLoader loader = ServiceLoader.load(CatalogProvider.class); List providers = new ArrayList<>(); - loader.iterator().forEachRemaining(x -> providers.add(x)); + loader.iterator().forEachRemaining(providers::add); return providers; } diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProvider.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProvider.java index d8160bd3..5f6d67e6 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProvider.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProvider.java @@ -13,7 +13,6 @@ /** Provides BackwardCompatibilityValidator and ForwardCompatibilityValidator. */ public class CompatibilityValidatorProvider implements ValidatorProvider { - @SuppressWarnings("unchecked") @Override public Collection validators(T obj) { if (obj instanceof SchemaPlus) { diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index 06d244ff..bd02376c 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -112,10 +112,8 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) { final SqlNode q = renameColumns(create.columnList, create.query); final String sql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql(); List schemaPath = pair.left.path(null); - String schemaName = schemaPlus.getName(); String viewName = pair.right; - List viewPath = new ArrayList<>(); - viewPath.addAll(schemaPath); + List viewPath = new ArrayList<>(schemaPath); viewPath.add(viewName); CalcitePrepare.AnalyzeViewResult analyzed = HoptimatorDriver.analyzeView(connection, sql); RelProtoDataType protoType = RelDataTypeImpl.proto(analyzed.rowType); @@ -165,8 +163,7 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con SchemaPlus schemaPlus = pair.left.plus(); String schemaName = schemaPlus.getName(); String viewName = pair.right; - List viewPath = new ArrayList<>(); - viewPath.addAll(schemaPath); + List viewPath = new ArrayList<>(schemaPath); viewPath.add(viewName); try { if (!(pair.left.schema instanceof Database)) { @@ -191,8 +188,7 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con pipelineName = pipelineName + "-" + viewParts[1]; } connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, pipelineName); - List sinkPath = new ArrayList<>(); - sinkPath.addAll(schemaPath); + List sinkPath = new ArrayList<>(schemaPath); sinkPath.add(sinkName); Table sink = pair.left.plus().getTable(sinkName); @@ -243,7 +239,6 @@ public void execute(SqlDropObject drop, CalcitePrepare.Context context) { String viewName = pair.right; SchemaPlus schemaPlus = pair.left.plus(); - String schemaName = schemaPlus.getName(); Table table = schemaPlus.getTable(viewName); if (table == null) { if (drop.ifExists) { @@ -253,8 +248,7 @@ public void execute(SqlDropObject drop, CalcitePrepare.Context context) { } final List schemaPath = pair.left.path(null); - List viewPath = new ArrayList<>(); - viewPath.addAll(schemaPath); + List viewPath = new ArrayList<>(schemaPath); viewPath.add(viewName); try { @@ -306,18 +300,16 @@ static SqlNode renameColumns(SqlNodeList columnList, SqlNode query) { final SqlParserPos p = query.getParserPosition(); final SqlNodeList selectList = SqlNodeList.SINGLETON_STAR; final SqlCall from = SqlStdOperatorTable.AS.createCall(p, - Arrays.asList(new SqlNode[]{query, new SqlIdentifier("_", p), columnList})); + Arrays.asList(query, new SqlIdentifier("_", p), columnList)); return new SqlSelect(p, null, selectList, from, null, null, null, null, null, null, null, null, null); } /** Unchecked exception related to a DDL statement. */ static public class DdlException extends RuntimeException { - private final SqlParserPos pos; private DdlException(SqlNode node, SqlParserPos pos, String msg, Throwable cause) { super("Cannot " + node.toString() + " at line " + pos.getLineNum() + " col " + pos.getColumnNum() + ": " + msg, cause); - this.pos = pos; } DdlException(SqlNode node, String msg, Throwable cause) { diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDriver.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDriver.java index 0d4b928a..53c32cd5 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDriver.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDriver.java @@ -119,7 +119,7 @@ public Connection connect(String url, Properties props) throws SQLException { Wrapped wrapped = new Wrapped(hoptimatorConnection, rootSchema); String[] catalogs = properties.getProperty("catalogs", "").split(","); - if (catalogs.length == 0 || catalogs[0].length() == 0) { + if (catalogs.length == 0 || catalogs[0].isEmpty()) { // load all catalogs (typical usage) for (Catalog catalog : CatalogService.catalogs()) { catalog.register(wrapped); @@ -138,7 +138,7 @@ public Connection connect(String url, Properties props) throws SQLException { } } - private static class ConnectionHolder { + private static final class ConnectionHolder { HoptimatorConnection connection; } diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ValidationService.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ValidationService.java index 27f8e463..6aa160ff 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ValidationService.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ValidationService.java @@ -54,14 +54,14 @@ public static void validateOrThrow(T obj) throws SQLException { Validator.Issues issues = new Validator.Issues(""); validate(obj, issues); if (!issues.valid()) { - throw new SQLDataException("Failed validation:\n" + issues.toString()); + throw new SQLDataException("Failed validation:\n" + issues); } } public static Collection providers() { ServiceLoader loader = ServiceLoader.load(ValidatorProvider.class); List providers = new ArrayList<>(); - loader.iterator().forEachRemaining(x -> providers.add(x)); + loader.iterator().forEachRemaining(providers::add); return providers; } diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/JdbcTestBase.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/JdbcTestBase.java index ed9ce98a..a99d55ca 100644 --- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/JdbcTestBase.java +++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/JdbcTestBase.java @@ -49,7 +49,7 @@ protected void assertQueryEmpty(String q) throws SQLException { protected void assertQueryNonEmpty(String q) throws SQLException { List res = query(q); - Assertions.assertTrue(!res.isEmpty(), "ResultSet is empty"); + Assertions.assertFalse(res.isEmpty(), "ResultSet is empty"); } protected List query(String query) throws SQLException { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java index 263512fd..310d4473 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java @@ -22,7 +22,7 @@ public class K8sApi implements Api { - private final static Logger log = LoggerFactory.getLogger(K8sApi.class); + private static final Logger log = LoggerFactory.getLogger(K8sApi.class); private final K8sContext context; private final K8sApiEndpoint endpoint; diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java index f7a79f72..aa2f1043 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java @@ -16,7 +16,7 @@ public class K8sYamlApi implements Api { - private final static Logger log = LoggerFactory.getLogger(K8sYamlApi.class); + private static final Logger log = LoggerFactory.getLogger(K8sYamlApi.class); private final K8sContext context; diff --git a/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaControllerProvider.java b/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaControllerProvider.java index 7bf7d191..11666d7e 100644 --- a/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaControllerProvider.java +++ b/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaControllerProvider.java @@ -43,6 +43,6 @@ public Collection controllers(Operator operator) { .watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1Acl.class, x).build()) .build(); - return Arrays.asList(new Controller[]{topicController, topicAclController}); + return Arrays.asList(topicController, topicAclController); } } diff --git a/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicAclReconciler.java b/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicAclReconciler.java index 5173e2e2..2d46ed0f 100644 --- a/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicAclReconciler.java +++ b/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicAclReconciler.java @@ -28,9 +28,9 @@ public class KafkaTopicAclReconciler implements Reconciler { - private final static Logger log = LoggerFactory.getLogger(KafkaTopicAclReconciler.class); - private final static String ACL = "hoptimator.linkedin.com/v1alpha1/Acl"; - private final static String KAFKATOPIC = "hoptimator.linkedin.com/v1alpha1/KafkaTopic"; + private static final Logger log = LoggerFactory.getLogger(KafkaTopicAclReconciler.class); + private static final String ACL = "hoptimator.linkedin.com/v1alpha1/Acl"; + private static final String KAFKATOPIC = "hoptimator.linkedin.com/v1alpha1/KafkaTopic"; private final Operator operator; @@ -87,20 +87,17 @@ public Result reconcile(Request request) { ConfigAssembler assembler = new ConfigAssembler(operator); list(target.getSpec().getClientConfigs()).forEach( x -> assembler.addRef(namespace, x.getConfigMapRef().getName())); - map(target.getSpec().getClientOverrides()).forEach((k, v) -> assembler.addOverride(k, v)); + map(target.getSpec().getClientOverrides()).forEach(assembler::addOverride); Properties properties = assembler.assembleProperties(); log.info("Using AdminClient config: {}", properties); - AdminClient admin = AdminClient.create(properties); - try { + try (AdminClient admin = AdminClient.create(properties)) { log.info("Creating KafkaTopic Acl for {}...", target.getSpec().getTopicName()); AclBinding binding = new AclBinding( new ResourcePattern(ResourceType.TOPIC, target.getSpec().getTopicName(), PatternType.LITERAL), new AccessControlEntry(principal, "*", operation, AclPermissionType.ALLOW)); admin.createAcls(Collections.singleton(binding)).all().get(); log.info("Granted {} {} access to {}.", principal, method, target.getSpec().getTopicName()); - } finally { - admin.close(); } } catch (Exception e) { log.error("Encountered exception while reconciling KafkaTopic Acl {}/{}", namespace, name, e); diff --git a/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicReconciler.java b/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicReconciler.java index 08cb8377..21e57667 100644 --- a/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicReconciler.java +++ b/hoptimator-kafka-controller/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicReconciler.java @@ -26,8 +26,8 @@ public class KafkaTopicReconciler implements Reconciler { - private final static Logger log = LoggerFactory.getLogger(KafkaTopicReconciler.class); - private final static String KAFKATOPIC = "hoptimator.linkedin.com/v1alpha1/KafkaTopic"; + private static final Logger log = LoggerFactory.getLogger(KafkaTopicReconciler.class); + private static final String KAFKATOPIC = "hoptimator.linkedin.com/v1alpha1/KafkaTopic"; private final Operator operator; @@ -61,7 +61,7 @@ public Result reconcile(Request request) { ConfigAssembler assembler = new ConfigAssembler(operator); list(object.getSpec().getClientConfigs()).forEach( x -> assembler.addRef(namespace, x.getConfigMapRef().getName())); - map(object.getSpec().getClientOverrides()).forEach((k, v) -> assembler.addOverride(k, v)); + map(object.getSpec().getClientOverrides()).forEach(assembler::addOverride); Properties properties = assembler.assembleProperties(); log.info("Using AdminClient config: {}", properties); @@ -88,7 +88,7 @@ public Result reconcile(Request request) { if (e.getCause() instanceof UnknownTopicOrPartitionException) { log.info("No existing topic {}. Will create it.", topicName); admin.createTopics(Collections.singleton(new NewTopic(topicName, Optional.ofNullable(desiredPartitions), - Optional.ofNullable(desiredReplicationFactor).map(x -> x.shortValue())))).all().get(); + Optional.ofNullable(desiredReplicationFactor).map(Integer::shortValue)))).all().get(); object.getStatus().setNumPartitions(desiredPartitions); } else { throw e; diff --git a/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/ClusterSchema.java b/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/ClusterSchema.java index 3c8c32c8..3d323d98 100644 --- a/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/ClusterSchema.java +++ b/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/ClusterSchema.java @@ -15,7 +15,7 @@ public class ClusterSchema extends AbstractSchema { - private static Logger log = LoggerFactory.getLogger(ClusterSchema.class); + private static final Logger log = LoggerFactory.getLogger(ClusterSchema.class); private final Properties properties; private final Map tableMap = new HashMap<>(); @@ -33,6 +33,7 @@ public void populate() throws InterruptedException, ExecutionException { for (String name : topicNames) { tableMap.put(name, new KafkaTopic(name, properties)); } + adminClient.close(); } @Override diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/ConfigAssembler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/ConfigAssembler.java index 03c098e2..d1c65816 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/ConfigAssembler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/ConfigAssembler.java @@ -29,13 +29,13 @@ public void addRef(String namespace, String name) { public Map assemble() { Map combined = new HashMap<>(); refs.forEach(x -> combined.putAll(x.fetch(operator))); - overrides.forEach((k, v) -> combined.put(k, v)); + combined.putAll(overrides); return combined; } public Properties assembleProperties() { Properties properties = new Properties(); - assemble().forEach((k, v) -> properties.put(k, v)); + properties.putAll(assemble()); return properties; } diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/ControllerService.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/ControllerService.java index 6486ff4c..f5180f32 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/ControllerService.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/ControllerService.java @@ -18,7 +18,7 @@ private ControllerService() { public static Collection providers() { ServiceLoader loader = ServiceLoader.load(ControllerProvider.class); List providers = new ArrayList<>(); - loader.iterator().forEachRemaining(x -> providers.add(x)); + loader.iterator().forEachRemaining(providers::add); return providers; } diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java index 3acdedd5..66c8caa6 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java @@ -88,7 +88,7 @@ public void run() throws Exception { HoptimatorPlanner.Factory plannerFactory = HoptimatorPlanner.Factory.fromJdbc(url, properties); // ensure JDBC connection works, and that static classes are initialized in the main thread - HoptimatorPlanner planner = plannerFactory.makePlanner(); + plannerFactory.makePlanner(); Properties connectionProperties = new Properties(); connectionProperties.putAll(properties); @@ -101,8 +101,7 @@ public void run() throws Exception { operator.registerApi("Subscription", "subscription", "subscriptions", "hoptimator.linkedin.com", "v1alpha1", V1alpha1Subscription.class, V1alpha1SubscriptionList.class); - List controllers = new ArrayList<>(); - controllers.addAll(ControllerService.controllers(operator)); + List controllers = new ArrayList<>(ControllerService.controllers(operator)); controllers.add(SubscriptionReconciler.controller(operator, plannerFactory, environment, subscriptionFilter)); context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5), watchNamespace); diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java index 4a635bde..c4ceec0c 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java @@ -28,7 +28,7 @@ /** Single handle to all the clients and configs required by all the controllers. */ public class Operator { - private final static Logger log = LoggerFactory.getLogger(Operator.class); + private static final Logger log = LoggerFactory.getLogger(Operator.class); private final String namespace; private final ApiClient apiClient; diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java index d75bf54a..8ee6ce0d 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java @@ -29,7 +29,7 @@ * Manages Pipelines. */ public final class PipelineReconciler implements Reconciler { - private final static Logger log = LoggerFactory.getLogger(PipelineReconciler.class); + private static final Logger log = LoggerFactory.getLogger(PipelineReconciler.class); private final K8sContext context; private final K8sApi pipelineApi; @@ -62,8 +62,7 @@ public Result reconcile(Request request) { log.info("Checking status of Pipeline {}...", name); - boolean ready = Arrays.asList(object.getSpec().getYaml().split("\n---\n")) - .stream() + boolean ready = Arrays.stream(object.getSpec().getYaml().split("\n---\n")) .filter(x -> x != null && !x.isEmpty()) .allMatch(x -> isReady(x, namespace)); @@ -96,12 +95,12 @@ public Result reconcile(Request request) { } // TODO load from configuration - protected Duration failureRetryDuration() { + private Duration failureRetryDuration() { return Duration.ofMinutes(5); } // TODO load from configuration - protected Duration pendingRetryDuration() { + private Duration pendingRetryDuration() { return Duration.ofMinutes(1); } diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java index 18b452ee..6c6807a3 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java @@ -7,15 +7,15 @@ /** * Exposes Subscription variables to resource templates. - * + *

* Variables have a `pipeline.` prefix (even though they come from the * Subscription object), because the planner is unaware of Subscriptions. * For example, the CLI constructs pipelines without any corresponding - * Subscription object. In future, we may have additional K8s objects + * Subscription object. In the future, we may have additional K8s objects * that result in pipelines. - * + *

* The exported variables include: - * + *

* - `pipeline.namespace`, the K8s namespace where the pipeline should be * deployed. This is a recommendation -- templates may elect to ignore it. * - `pipeline.name`, a unique name for the pipeline. Templates can use this diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java index dc3d95c7..47c54253 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java @@ -35,8 +35,8 @@ public final class SubscriptionReconciler implements Reconciler { - private final static Logger log = LoggerFactory.getLogger(SubscriptionReconciler.class); - private final static String SUBSCRIPTION = "hoptimator.linkedin.com/v1alpha1/Subscription"; + private static final Logger log = LoggerFactory.getLogger(SubscriptionReconciler.class); + private static final String SUBSCRIPTION = "hoptimator.linkedin.com/v1alpha1/Subscription"; private final Operator operator; private final HoptimatorPlanner.Factory plannerFactory; @@ -97,7 +97,7 @@ public Result reconcile(Request request) { // 1. Plan a pipeline, and write the plan to Status. // 2. Deploy the pipeline per plan. // 3. Verify readiness of the entire pipeline. - // Each phase should be a separate reconcilation loop to avoid races. + // Each phase should be a separate reconciliation loop to avoid races. // TODO: We should disown orphaned resources when the pipeline changes. if (diverged(object.getSpec(), status)) { // Phase 1 @@ -167,7 +167,7 @@ public Result reconcile(Request request) { } else { log.info("Checking status of pipeline for {}/{}...", kind, name); - boolean ready = status.getResources().stream().allMatch(x -> operator.isReady(x)); + boolean ready = status.getResources().stream().allMatch(operator::isReady); if (ready) { status.setReady(true); @@ -184,9 +184,9 @@ public Result reconcile(Request request) { } status.setAttributes(Stream.concat(status.getJobResources().stream(), status.getDownstreamResources().stream()) - .map(x -> fetchAttributes(x)) + .map(this::fetchAttributes) .flatMap(x -> x.entrySet().stream()) - .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue(), (x, y) -> y))); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x, y) -> y))); operator.apiFor(SUBSCRIPTION) .updateStatus(object, x -> object.getStatus()) @@ -262,7 +262,7 @@ private static Map guessAttributes(DynamicKubernetesObject obj) .entrySet() .stream() .filter(x -> x.getValue().isJsonPrimitive()) - .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().getAsString())); + .collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().getAsString())); } catch (Exception e) { log.debug("Exception looking for .status.attributes. Swallowing.", e); } @@ -275,7 +275,7 @@ private static Map guessAttributes(DynamicKubernetesObject obj) .entrySet() .stream() .filter(x -> x.getValue().isJsonPrimitive()) - .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().getAsString())); + .collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().getAsString())); } catch (Exception e) { log.debug("Exception looking for .status.jobStatus. Swallowing.", e); } @@ -286,7 +286,7 @@ private static Map guessAttributes(DynamicKubernetesObject obj) .entrySet() .stream() .filter(x -> x.getValue().isJsonPrimitive()) - .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().getAsString())); + .collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().getAsString())); } catch (Exception e) { log.debug("Exception looking for .status. Swallowing.", e); } diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java index bd060952..0a75d4fa 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java @@ -84,7 +84,7 @@ static Factory fromJdbc(String url, Properties properties) { public HoptimatorPlanner(SchemaPlus schema) { this.schema = schema; - List traitDefs = new ArrayList(); + List traitDefs = new ArrayList<>(); traitDefs.add(ConventionTraitDef.INSTANCE); traitDefs.add(RelCollationTraitDef.INSTANCE); @@ -119,7 +119,7 @@ public RelNode logical(String sql) throws Exception { public Database database(String name) { String rootName = schema.getName(); - if (rootName == null || rootName.length() == 0) { + if (rootName == null || rootName.isEmpty()) { rootName = "ROOT"; } Schema subSchema = schema.getSubSchema(name); diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java index af0978c3..e9ba1544 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/Pipeline.java @@ -74,7 +74,7 @@ public String mermaid() { sb.append("flowchart\n"); Map> grouped = resources().stream().collect(Collectors.groupingBy(x -> x.template())); grouped.forEach((k, v) -> { - sb.append(" subgraph " + k + "\n"); + sb.append(" subgraph ").append(k).append("\n"); v.forEach(x -> { String description = x.keys() .stream() @@ -83,15 +83,15 @@ public String mermaid() { .filter(k2 -> !"id".equals(k2)) .map(k2 -> k2 + ": " + sanitize(x.property(k2))) .collect(Collectors.joining("\n")); - sb.append(" " + id(x) + "[\"" + description + "\"]\n"); + sb.append(" ").append(id(x)).append("[\"").append(description).append("\"]\n"); }); sb.append(" end\n"); }); grouped.forEach((k, v) -> { - sb.append(" subgraph " + k + "\n"); + sb.append(" subgraph ").append(k).append("\n"); v.forEach(x -> { x.inputs().forEach(y -> { - sb.append(" " + id(y) + " --> " + id(x) + "\n"); + sb.append(" ").append(id(y)).append(" --> ").append(id(x)).append("\n"); }); }); sb.append(" end\n"); @@ -100,7 +100,7 @@ public String mermaid() { } private static String id(Resource resource) { - return "R" + Integer.toString(resource.hashCode()); + return "R" + resource.hashCode(); } private static String sanitize(String s) { diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java index 57124532..27de5b6a 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java @@ -20,7 +20,7 @@ /** * Calling convention which implements an SQL-based streaming data pipeline. - * + *

* Pipelines tend to have the following general shape: *

  *                                   _________
@@ -33,7 +33,6 @@
  * CDC and rETL "hops" are made possible by Resources, which describe any
  * additional infra required by the Pipeline. As Resources are essentially
  * YAML, anything can be represented there, including additional SQL jobs.
- *
  */
 public interface PipelineRel extends RelNode {
 
@@ -67,7 +66,7 @@ public void resource(Resource resource) {
     }
 
     private void visit(RelNode input) {
-      input.getInputs().forEach(x -> visit(x));
+      input.getInputs().forEach(this::visit);
       ((PipelineRel) input).implement(this);
     }
 
@@ -83,10 +82,10 @@ public ScriptImplementor insertInto(HopTable sink) {
       return script.database(sink.database()).with(sink).insert(sink.database(), sink.name(), castRel);
     }
 
-    /** Add any resources, SQL, DDL etc required to access the table. */
+    /** Add any resources: SQL, DDL, etc. required to access the table. */
     public void implement(HopTable table) {
       script = script.database(table.database()).with(table);
-      table.readResources().forEach(x -> resource(x));
+      table.readResources().forEach(this::resource);
     }
 
     /** Combine SQL and any Resources into a Pipeline, using ANSI dialect */
diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRules.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRules.java
index 51c93d03..26b1c9d9 100644
--- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRules.java
+++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRules.java
@@ -244,7 +244,7 @@ public void implement(Implementor implementor) {
   }
 
   static Table findTable(CalciteSchema schema, List qualifiedName) {
-    if (qualifiedName.size() == 0) {
+    if (qualifiedName.isEmpty()) {
       throw new IllegalArgumentException("Empty qualified name.");
     } else if (qualifiedName.size() == 1) {
       String name = qualifiedName.get(0);
diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/SqlJob.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/SqlJob.java
index a3779e42..270e48f5 100644
--- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/SqlJob.java
+++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/SqlJob.java
@@ -5,13 +5,12 @@
 
 /**
  * Anything that can run SQL, e.g. a Flink job.
- *
+ * 

* The planner generates these, but they are not directly deployable. Instead, * an adapter should provide a template that turns SqlJobs into something * concrete and deployable, e.g. a FlinkDeployment. - * + *

* To do so, an adapter just needs to include a proper `SqlJob.template.yaml`. - * */ public class SqlJob extends Resource { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConnectionService.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConnectionService.java index 40e4553d..2747cab3 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConnectionService.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConnectionService.java @@ -31,7 +31,7 @@ public static Map configure(T obj, Properties connectionProp public static Collection providers() { ServiceLoader loader = ServiceLoader.load(ConnectorProvider.class); List providers = new ArrayList<>(); - loader.iterator().forEachRemaining(x -> providers.add(x)); + loader.iterator().forEachRemaining(providers::add); return providers; } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java index 69fa1847..e77fc839 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java @@ -25,10 +25,10 @@ private DataTypeUtils() { /** * Flattens nested structs and complex arrays. - * + *

* Nested structs like `FOO Row(BAR Row(QUX VARCHAR))` are promoted to * top-level fields like `FOO$BAR$QUX VARCHAR`. - * + *

* Complex arrays like `FOO Row(BAR Row(QUX VARCHAR)) ARRAY` are promoted to * top-level fields like `FOO ANY ARRAY` and `FOO$BAR$QUX VARCHAR`. * Primitive arrays are unchanged. diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingStatement.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingStatement.java index 594b8883..8f33886d 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingStatement.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingStatement.java @@ -23,13 +23,13 @@ class DelegatingStatement implements Statement { @Override public boolean execute(String sql) throws SQLException { - executeQuery(sql); + resultSet = executeQuery(sql); return true; } @Override public ResultSet executeQuery(String sql) throws SQLException { - // Split multi-statement queries and execute any leading updates + // Split multi-statement queries and execute any leading updates String[] parts = sql.split(";\n"); int i = 0; for (; i < parts.length - 1; i++) { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java index 550bec6c..83206d90 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java @@ -68,7 +68,7 @@ public static List specify(T obj, Properties conn public static Collection providers() { ServiceLoader loader = ServiceLoader.load(DeployerProvider.class); List providers = new ArrayList<>(); - loader.iterator().forEachRemaining(x -> providers.add(x)); + loader.iterator().forEachRemaining(providers::add); return providers; } @@ -84,7 +84,7 @@ public static PipelineRel.Implementor plan(RelRoot root, List planner.addRule(x)); + PipelineRules.rules().forEach(planner::addRule); PipelineRel plan = (PipelineRel) program.run(planner, root.rel, traitSet, materializations, Collections.emptyList()); PipelineRel.Implementor implementor = new PipelineRel.Implementor(root.fields, parseHints(connectionProperties)); diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java index 74d763ae..5ef334a9 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java @@ -146,11 +146,11 @@ public String getOrDefault(String key, Supplier f) { /** * Replaces `{{var}}` in a template file with the corresponding variable. - * + *

* Default values can be supplied with `{{var:default}}`. - * + *

* Built-in transformations can be applied to variables, including: - * + *

* - `{{var toName}}`, `{{var:default toName}}`: canonicalize the * variable as a valid K8s object name. * - `{{var toUpperCase}}`, `{{var:default toUpperCase}}`: render in @@ -160,24 +160,24 @@ public String getOrDefault(String key, Supplier f) { * - `{{var concat}}`, `{{var:default concat}}`: concatinate a multiline * string into one line * - `{{var concat toUpperCase}}`: apply both transformations in sequence. - * + *

* If `var` contains multiple lines, the behavior depends on context; * specifically, whether the pattern appears within a list or comment * (prefixed with `-` or `#`). For example, if the template includes: - * + *

* - {{var}} - * + *

* ...and `var` contains multiple lines, then the output will be: - * + *

* - value line 1 * - value line 2 - * + *

* To avoid this behavior (and just get a multiline string), use one of * YAML's multiline markers, e.g. - * + *

* - | * {{var}} - * + *

* In either case, the multiline string will be properly indented. */ class SimpleTemplate implements Template { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java index e8e147de..16df63b0 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java @@ -65,7 +65,7 @@ public RelNode convert(RelNode rel) { private class RemoteTableScan extends TableScan implements RemoteRel { - public RemoteTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { + RemoteTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { super(cluster, traitSet, Collections.emptyList(), table); } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcConvention.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcConvention.java index f2479517..555cc907 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcConvention.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcConvention.java @@ -46,7 +46,7 @@ public void register(RelOptPlanner planner) { super.register(planner); planner.addRule(PipelineRules.PipelineTableScanRule.create(this)); planner.addRule(PipelineRules.PipelineTableModifyRule.create(this)); - PipelineRules.rules().forEach(x -> planner.addRule(x)); + PipelineRules.rules().forEach(planner::addRule); engines().forEach(x -> new EngineRules(x).register(this, planner, connectionProperties)); } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index faec94cc..abb4752e 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -28,7 +28,7 @@ /** * Calling convention which implements a data pipeline. - * + *

* "Convention" here just means a target set of "traits" the planner should * aim for. We can ask the planner to convert a query into the PIPELINE * convention, and the result will be a PipelineRel. This in turn can be @@ -71,7 +71,7 @@ public void visit(RelNode node) throws SQLException { /** * Adds a source to the pipeline. - * + *

* This involves deploying any relevant objects, and configuring * a connector. The connector is configured via `CREATE TABLE...WITH(...)`. */ @@ -83,7 +83,7 @@ public void addSource(String database, List path, RelDataType rowType, M /** * Sets the sink to use for the pipeline. - * + *

* By default, the sink is `PIPELINE.SINK`. An expected row type is required * for validation purposes. */ diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java index d8a513e8..d2fed242 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java @@ -95,7 +95,6 @@ static class PipelineTableScan extends TableScan implements PipelineRel { @Override public void implement(Implementor implementor) throws SQLException { - RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); implementor.addSource(database, table.getQualifiedName(), table.getRowType(), Collections.emptyMap()); // TODO pass in table scan hints } @@ -386,7 +385,7 @@ public void implement(Implementor implementor) { } static Table findTable(CalciteSchema schema, List qualifiedName) { - if (qualifiedName.size() == 0) { + if (qualifiedName.isEmpty()) { throw new IllegalArgumentException("Empty qualified name."); } else if (qualifiedName.size() == 1) { String name = qualifiedName.get(0); @@ -413,14 +412,14 @@ static Table findTable(CalciteSchema schema, String table) { } static CalciteSchema schema(RelNode node) { - return (CalciteSchema) Optional.ofNullable(node.getTable()) + return Optional.ofNullable(node.getTable()) .map(x -> x.unwrap(CalciteSchema.class)) .orElseThrow(() -> new IllegalArgumentException("null table?")); } static List qualifiedName(RelNode node) { return Optional.ofNullable(node.getTable()) - .map(x -> x.getQualifiedName()) + .map(RelOptTable::getQualifiedName) .orElseThrow(() -> new IllegalArgumentException("null table?")); } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java index 8f9a1bb7..c9d4aeb4 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java @@ -133,49 +133,47 @@ private SqlString generateSql(SqlDialect dialect) { System.out.println("[" + sql + "]"); } Hook.QUERY_PLAN.run(sql); - final Expression sql_ = + final Expression sqlInternal = builder0.append("sql", Expressions.constant(sql)); final int fieldCount = getRowType().getFieldCount(); BlockBuilder builder = new BlockBuilder(); - final ParameterExpression resultSet_ = + final ParameterExpression resultSet = Expressions.parameter(Modifier.FINAL, ResultSet.class, builder.newName("resultSet")); final SqlDialect.CalendarPolicy calendarPolicy = AnsiSqlDialect.DEFAULT.getCalendarPolicy(); // TODO hard-coded dialect - final Expression calendar_; - switch (calendarPolicy) { - case LOCAL: - calendar_ = + final Expression calendar; + if (requireNonNull(calendarPolicy) == SqlDialect.CalendarPolicy.LOCAL) { + calendar = builder0.append("calendar", Expressions.call(Calendar.class, "getInstance", getTimeZoneExpression(implementor))); - break; - default: - calendar_ = null; + } else { + calendar = null; } if (fieldCount == 1) { - final ParameterExpression value_ = + final ParameterExpression value = Expressions.parameter(Object.class, builder.newName("value")); - builder.add(Expressions.declare(Modifier.FINAL, value_, null)); - generateGet(implementor, physType, builder, resultSet_, 0, value_, - calendar_, calendarPolicy); - builder.add(Expressions.return_(null, value_)); + builder.add(Expressions.declare(Modifier.FINAL, value, null)); + generateGet(implementor, physType, builder, resultSet, 0, value, + calendar, calendarPolicy); + builder.add(Expressions.return_(null, value)); } else { - final Expression values_ = + final Expression values = builder.append("values", Expressions.newArrayBounds(Object.class, 1, Expressions.constant(fieldCount))); for (int i = 0; i < fieldCount; i++) { - generateGet(implementor, physType, builder, resultSet_, i, - Expressions.arrayIndex(values_, Expressions.constant(i)), - calendar_, calendarPolicy); + generateGet(implementor, physType, builder, resultSet, i, + Expressions.arrayIndex(values, Expressions.constant(i)), + calendar, calendarPolicy); } builder.add( - Expressions.return_(null, values_)); + Expressions.return_(null, values)); } - final ParameterExpression e_ = + final ParameterExpression e = Expressions.parameter(SQLException.class, builder.newName("e")); - final Expression rowBuilderFactory_ = + final Expression rowBuilderFactory = builder0.append("rowBuilderFactory", Expressions.lambda( Expressions.block( @@ -185,12 +183,12 @@ private SqlString generateSql(SqlDialect dialect) { Expressions.tryCatch( builder.toBlock(), Expressions.catch_( - e_, + e, Expressions.throw_( Expressions.new_( RuntimeException.class, - e_)))))))), - resultSet_)); + e)))))))), + resultSet)); String dataSourceUrl = convention.engine().url(); Expression dataSource = builder0.append("dataSource", @@ -204,7 +202,7 @@ private SqlString generateSql(SqlDialect dialect) { if (sqlString.getDynamicParameters() != null && !sqlString.getDynamicParameters().isEmpty()) { - final Expression preparedStatementConsumer_ = + final Expression preparedStatementConsumer = builder0.append("preparedStatementConsumer", Expressions.call(BuiltInMethod.CREATE_ENRICHER.method, Expressions.newArrayInit(Integer.class, 1, @@ -216,17 +214,17 @@ private SqlString generateSql(SqlDialect dialect) { Expressions.call( BuiltInMethod.RESULT_SET_ENUMERABLE_OF_PREPARED.method, dataSource, - sql_, - rowBuilderFactory_, - preparedStatementConsumer_)); + sqlInternal, + rowBuilderFactory, + preparedStatementConsumer)); } else { enumerable = builder0.append("enumerable", Expressions.call( BuiltInMethod.RESULT_SET_ENUMERABLE_OF.method, dataSource, - sql_, - rowBuilderFactory_)); + sqlInternal, + rowBuilderFactory)); } builder0.add( Expressions.statement( @@ -256,8 +254,8 @@ private static UnaryExpression getTimeZoneExpression( } private static void generateGet(EnumerableRelImplementor implementor, - PhysType physType, BlockBuilder builder, ParameterExpression resultSet_, - int i, Expression target, @Nullable Expression calendar_, + PhysType physType, BlockBuilder builder, ParameterExpression resultSet, + int i, Expression target, @Nullable Expression calendar, SqlDialect.CalendarPolicy calendarPolicy) { final Primitive primitive = Primitive.ofBoxOr(physType.fieldClass(i)); final RelDataType fieldType = @@ -268,8 +266,8 @@ private static void generateGet(EnumerableRelImplementor implementor, boolean offset = false; switch (calendarPolicy) { case LOCAL: - requireNonNull(calendar_, "calendar_"); - dateTimeArgs.add(calendar_); + requireNonNull(calendar, "calendar_"); + dateTimeArgs.add(calendar); break; case NULL: // We don't specify a calendar at all, so we don't add an argument and @@ -301,14 +299,14 @@ private static void generateGet(EnumerableRelImplementor implementor, getMethod(sqlTypeName, fieldType.isNullable(), offset), Expressions.list() .append( - Expressions.call(resultSet_, + Expressions.call(resultSet, getMethod2(sqlTypeName), dateTimeArgs)) .appendIf(offset, getTimeZoneExpression(implementor))); break; case ARRAY: final Expression x = Expressions.convert_( - Expressions.call(resultSet_, jdbcGetMethod(primitive), + Expressions.call(resultSet, jdbcGetMethod(primitive), Expressions.constant(i + 1)), java.sql.Array.class); source = Expressions.call(BuiltInMethod.JDBC_ARRAY_TO_LIST.method, x); @@ -318,7 +316,7 @@ private static void generateGet(EnumerableRelImplementor implementor, break; default: source = - Expressions.call(resultSet_, jdbcGetMethod(primitive), + Expressions.call(resultSet, jdbcGetMethod(primitive), Expressions.constant(i + 1)); break; } @@ -332,7 +330,7 @@ private static void generateGet(EnumerableRelImplementor implementor, if (primitive != null) { builder.add( Expressions.ifThen( - Expressions.call(resultSet_, "wasNull"), + Expressions.call(resultSet, "wasNull"), Expressions.statement( Expressions.assign(target, Expressions.constant(null))))); diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 94938e0e..53859acd 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -48,21 +48,21 @@ /** * An abstract way to write SQL scripts. - * + *

* To generate a specific statement, implement this interface, or use one * of the provided implementations, e.g. `QueryImplementor`. - * + *

* To generate a script (more than one statement), start with `empty()` * and append subsequent ScriptImplementors with `with(...)` etc. - * + *

* e.g. - * + *

* ScriptImplementor.empty() * .database(db) * .connector(name, rowType, configs) - * + *

* ... would produce something like - * + *

* CREATE DATABASE IF NOT EXIST `FOO`; * CREATE TABLE `BAR` (NAME VARCHAR) WITH ('key1' = 'value1'); */ @@ -189,7 +189,7 @@ public SqlNode visit(SqlCall call) { /** * Implements a CREATE TABLE...WITH... DDL statement. - * + *

* N.B. the following magic: * - field 'PRIMARY_KEY' is treated as a PRIMARY KEY * - NULL fields are promoted to BYTES @@ -248,10 +248,10 @@ public void implement(SqlWriter w) { } /** Implements an INSERT INTO statement. - * + *

* N.B. the following magic: * - NULL columns (e.g. `NULL AS KEY`) are elided from the pipeline - * + *

* */ class InsertImplementor implements ScriptImplementor { private final String schema; @@ -364,7 +364,7 @@ public CompoundIdentifierImplementor(String schema, String table) { @Override public void implement(SqlWriter w) { - SqlIdentifier identifier = new SqlIdentifier(Arrays.asList(new String[]{schema, table}), SqlParserPos.ZERO); + SqlIdentifier identifier = new SqlIdentifier(Arrays.asList(schema, table), SqlParserPos.ZERO); identifier.unparse(w, 0, 0); } } @@ -379,13 +379,13 @@ public IdentifierImplementor(String name) { @Override public void implement(SqlWriter w) { - SqlIdentifier identifier = new SqlIdentifier(Arrays.asList(new String[]{name}), SqlParserPos.ZERO); + SqlIdentifier identifier = new SqlIdentifier(Arrays.asList(name), SqlParserPos.ZERO); identifier.unparse(w, 0, 0); } } /** Implements row type specs, e.g. `NAME VARCHAR(20), AGE INTEGER`. - * + *

* N.B. the following magic: * - NULL fields are promoted to BYTES * - Flattened fields like FOO$BAR are renamed FOO_BAR @@ -403,12 +403,12 @@ public void implement(SqlWriter w) { RelDataType flattened = dataType; List fieldNames = flattened.getFieldList() .stream() - .map(x -> x.getName()) + .map(RelDataTypeField::getName) .map(x -> x.replaceAll("\\$", "_")) // support FOO$BAR columns as FOO_BAR .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) .collect(Collectors.toList()); List fieldTypes = - flattened.getFieldList().stream().map(x -> x.getType()).map(x -> toSpec(x)).collect(Collectors.toList()); + flattened.getFieldList().stream().map(RelDataTypeField::getType).map(RowTypeSpecImplementor::toSpec).collect(Collectors.toList()); for (int i = 0; i < fieldNames.size(); i++) { w.sep(","); fieldNames.get(i).unparse(w, 0, 0); @@ -499,13 +499,13 @@ public ColumnListImplementor(List fields) { public void implement(SqlWriter w) { SqlWriter.Frame frame1 = w.startList("(", ")"); List fieldNames = fields.stream() - .map(x -> x.getName()) + .map(RelDataTypeField::getName) .map(x -> x.replaceAll("\\$", "_")) // support FOO$BAR columns as FOO_BAR .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) .collect(Collectors.toList()); - for (int i = 0; i < fieldNames.size(); i++) { + for (SqlIdentifier fieldName : fieldNames) { w.sep(","); - fieldNames.get(i).unparse(w, 0, 0); + fieldName.unparse(w, 0, 0); } w.endList(frame1); } diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java index f7a8adb8..833d4ff1 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java @@ -18,7 +18,7 @@ class DeploymentServiceTest { * "hint" keys and values are required to be non-{@code null}. An * empty {@link Map} is considered invalid and should not be added * to the {@link Properties} object. - *
+ *

* nb. "pipeline" values are always added when present. */ @Test