Skip to content

Make lots of checkstyle changes - plan to enable blocking PRs soon, still some more involved issues #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ build:
docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-runner -t hoptimator-flink-runner
docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-operator -t hoptimator-flink-operator

bounce: build undeploy deploy deploy-samples deploy-config
bounce: build undeploy deploy

clean:
./gradlew clean
Expand All @@ -26,6 +26,10 @@ deploy: deploy-config
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy
kubectl apply -f ./deploy/dev/rbac.yaml
kubectl wait --for=condition=Established=True \
crds/subscriptions.hoptimator.linkedin.com \
crds/kafkatopics.hoptimator.linkedin.com \
crds/sqljobs.hoptimator.linkedin.com

undeploy: undeploy-config
kubectl delete -f ./deploy/dev/rbac.yaml || echo "skipping"
Expand All @@ -45,9 +49,9 @@ deploy-flink: deploy
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.11.0/
helm upgrade --install --atomic --set webhook.create=false,image.pullPolicy=Never,image.repository=docker.io/library/hoptimator-flink-operator,image.tag=latest --set-json='watchNamespaces=["default","flink"]' flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
kubectl apply -f deploy/dev/flink-session-cluster.yaml
kubectl apply -f deploy/dev/flink-sql-gateway.yaml
kubectl apply -f deploy/samples/flink-template.yaml
kubectl apply -f ./deploy/dev/flink-session-cluster.yaml
kubectl apply -f ./deploy/dev/flink-sql-gateway.yaml
kubectl apply -f ./deploy/samples/flink-template.yaml

undeploy-flink:
kubectl delete flinksessionjobs.flink.apache.org --all || echo "skipping"
Expand All @@ -61,8 +65,12 @@ undeploy-flink:
deploy-kafka: deploy deploy-flink
kubectl create namespace kafka || echo "skipping"
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
kubectl apply -f ./deploy/samples/kafkadb.yaml
kubectl apply -f ./deploy/dev/kafka.yaml
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m

undeploy-kafka:
kubectl delete kafkatopic.kafka.strimzi.io --all || echo "skipping"
Expand All @@ -84,20 +92,12 @@ undeploy-venice:
docker compose -f ./deploy/docker/venice/docker-compose-single-dc-setup.yaml down

deploy-dev-environment: deploy deploy-demo deploy-flink deploy-kafka deploy-venice
kubectl wait --for=condition=Established=True \
crds/subscriptions.hoptimator.linkedin.com \
crds/kafkatopics.hoptimator.linkedin.com \
crds/sqljobs.hoptimator.linkedin.com
kubectl apply -f ./deploy/dev/

undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy-demo undeploy
kubectl delete -f ./deploy/dev || echo "skipping"

# Integration test setup intended to be run locally
integration-tests: deploy-dev-environment
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m
kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & echo $$! > port-forward.pid
kubectl port-forward -n flink svc/flink-sql-gateway 8083 & echo $$! > port-forward-2.pid
kubectl port-forward -n flink svc/basic-session-deployment-rest 8081 & echo $$! > port-forward-3.pid
Expand All @@ -108,9 +108,6 @@ integration-tests: deploy-dev-environment

# kind cluster used in github workflow needs to have different routing set up, avoiding the need to forward kafka ports
integration-tests-kind: deploy-dev-environment
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m
./gradlew intTest -i --no-parallel

generate-models:
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ Commands `deploy-kafka`, `deploy-venice`, `deploy-flink`, etc. exist in isolatio
To produce/consume Kafka data, use the following commands:

```
$ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1
$ kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 --from-beginning
$ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.46.0-kafka-4.0.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1
$ kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.46.0-kafka-4.0.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 --from-beginning
```

### Flink
Expand Down
52 changes: 42 additions & 10 deletions deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,56 @@
# Based on examples at:
# https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/kafka

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: controller
namespace: kafka
labels:
strimzi.io/cluster: one
spec:
replicas: 3
roles:
- controller
storage:
type: jbod
volumes:
- id: 0
type: ephemeral
kraftMetadata: shared
---

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: broker
namespace: kafka
labels:
strimzi.io/cluster: one
spec:
replicas: 3
roles:
- broker
storage:
type: jbod
volumes:
- id: 0
type: ephemeral
kraftMetadata: shared
---

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: one
namespace: kafka
annotations:
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
version: 3.8.0
version: 4.0.0
metadataVersion: "4.0"
replicas: 1
listeners:
- name: plain
Expand Down Expand Up @@ -54,16 +95,7 @@ spec:
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.8"
allow.everyone.if.no.acl.found: true
storage:
type: ephemeral
authorization:
type: simple
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator:
watchedNamespace: default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public List<String> path() {
}

protected String pathString() {
return path.stream().collect(Collectors.joining("."));
return String.join(".", path);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public boolean valid() {
/** For convenience only, enabling try-with-resources */
public void close() {
closed = true;
children.values().forEach(x -> x.checkClosed());
children.values().forEach(Issues::checkClosed);
}

private void emit(String message) {
Expand All @@ -130,7 +130,7 @@ private void checkClosed() {
}

private boolean empty() {
return issues.isEmpty() && children.values().stream().allMatch(x -> x.empty());
return issues.isEmpty() && children.values().stream().allMatch(Issues::empty);
}

private String fullPath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public List<String> path() {
}

protected String pathString() {
return path.stream().collect(Collectors.joining("."));
return String.join(".", path);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
} else {
switch (dataType.getSqlTypeName()) {
case INTEGER:
return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
case SMALLINT:
return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
case BIGINT:
return createAvroTypeWithNullability(Schema.Type.LONG, dataType.isNullable());
case VARCHAR:
case CHAR:
return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable());
case FLOAT:
return createAvroTypeWithNullability(Schema.Type.FLOAT, dataType.isNullable());
case DOUBLE:
return createAvroTypeWithNullability(Schema.Type.DOUBLE, dataType.isNullable());
case CHAR:
return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable());
case BOOLEAN:
return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable());
case ARRAY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class AvroTableValidator implements Validator {

private final SchemaPlus schema;

public AvroTableValidator(SchemaPlus schema) {
AvroTableValidator(SchemaPlus schema) {
this.schema = schema;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
/** Provides AvroValidator. */
public class AvroValidatorProvider implements ValidatorProvider {

@SuppressWarnings("unchecked")
@Override
public <T> Collection<Validator> validators(T obj) {
if (obj instanceof SchemaPlus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static ConfigProvider from(Map<String, ?> configs) {
if (configs == null) {
return empty();
} else {
return x -> configs.entrySet().stream().collect(Collectors.toMap(y -> y.getKey(), y -> y.getValue().toString()));
return x -> configs.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, y -> y.getValue().toString()));
}
}

Expand All @@ -32,8 +32,7 @@ default ConfigProvider with(String key, Function<String, String> valueFunction)
if (base.containsKey(key)) {
throw new IllegalStateException("Key '" + key + "' previously defined.");
}
Map<String, String> combined = new HashMap<>();
combined.putAll(base);
Map<String, String> combined = new HashMap<>(base);
combined.put(key, valueFunction.apply(x));
return combined;
};
Expand All @@ -45,8 +44,7 @@ default ConfigProvider with(Map<String, ?> configs) {
}
return x -> {
Map<String, String> base = config(x);
Map<String, String> combined = new HashMap<>();
combined.putAll(base);
Map<String, String> combined = new HashMap<>(base);
configs.forEach((k, v) -> {
if (base.containsKey(k)) {
throw new IllegalStateException("Key '" + k + "' previously defined.");
Expand All @@ -66,6 +64,6 @@ default ConfigProvider with(String key, Integer value) {
}

default ConfigProvider withPrefix(String prefix) {
return x -> config(x).entrySet().stream().collect(Collectors.toMap(y -> prefix + y.getKey(), y -> y.getValue()));
return x -> config(x).entrySet().stream().collect(Collectors.toMap(y -> prefix + y.getKey(), Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.apache.calcite.sql.type.SqlTypeName;


/** Common data types. Not authoratitive or exhaustive. */
/** Common data types. Not authoritative or exhaustive. */
public enum DataType {

VARCHAR(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

/**
* Calling convention that ultimately gets converted to a Pipeline or similar.
*
* <p>
* For now, Hoptimator only implements Pipelines (via PipelineRel), which run on
* Flink. Eventually, PipelineRel may support additional runtimes (e.g. Spark),
* and/or Hoptimator may support additional calling conventions (e.g. batch jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

/**
* HopTables can have "baggage", including Resources and arbitrary DDL/SQL.
*
* <p>
* This mechanism is extremely powerful. In addition to enabling views, we can
* bring along arbitrary infra required to materialize a view. For example, a
* table can bring along a CDC stream or a cache. Generally, such Resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public final class HopTableScan extends TableScan implements HopRel {
@Override
public void register(RelOptPlanner planner) {
planner.addRule(HopTableScanRule.INSTANCE);
RuleService.rules().forEach(x -> planner.addRule(x));
RuleService.rules().forEach(planner::addRule);
}
}

Loading