|
48 | 48 | import io.strimzi.operator.cluster.model.logging.SupportsLogging;
|
49 | 49 | import io.strimzi.operator.cluster.model.securityprofiles.ContainerSecurityProviderContextImpl;
|
50 | 50 | import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl;
|
| 51 | +import io.strimzi.operator.common.Annotations; |
51 | 52 | import io.strimzi.operator.common.Reconciliation;
|
52 | 53 | import io.strimzi.operator.common.Util;
|
53 | 54 | import io.strimzi.operator.common.model.InvalidResourceException;
|
@@ -86,29 +87,23 @@ public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging
|
86 | 87 | // Kafka Bridge configuration keys (EnvVariables)
|
87 | 88 | protected static final String ENV_VAR_PREFIX = "KAFKA_BRIDGE_";
|
88 | 89 | protected static final String ENV_VAR_KAFKA_BRIDGE_METRICS_ENABLED = "KAFKA_BRIDGE_METRICS_ENABLED";
|
89 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS = "KAFKA_BRIDGE_BOOTSTRAP_SERVERS"; |
90 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_TLS = "KAFKA_BRIDGE_TLS"; |
91 | 90 | protected static final String ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS = "KAFKA_BRIDGE_TRUSTED_CERTS";
|
92 | 91 | protected static final String OAUTH_TLS_CERTS_BASE_VOLUME_MOUNT = "/opt/strimzi/oauth-certs/";
|
93 |
| - protected static final String ENV_VAR_STRIMZI_TRACING = "STRIMZI_TRACING"; |
94 |
| - |
95 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG = "KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG"; |
96 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG = "KAFKA_BRIDGE_PRODUCER_CONFIG"; |
97 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG = "KAFKA_BRIDGE_CONSUMER_CONFIG"; |
98 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_ID = "KAFKA_BRIDGE_ID"; |
99 |
| - |
100 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_HOST = "KAFKA_BRIDGE_HTTP_HOST"; |
101 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_PORT = "KAFKA_BRIDGE_HTTP_PORT"; |
102 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT = "KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT"; |
103 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED = "KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED"; |
104 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED = "KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED"; |
105 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED = "KAFKA_BRIDGE_CORS_ENABLED"; |
106 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS = "KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS"; |
107 |
| - protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS = "KAFKA_BRIDGE_CORS_ALLOWED_METHODS"; |
108 | 92 |
|
109 | 93 | protected static final String CO_ENV_VAR_CUSTOM_BRIDGE_POD_LABELS = "STRIMZI_CUSTOM_KAFKA_BRIDGE_LABELS";
|
110 | 94 | protected static final String INIT_VOLUME_MOUNT = "/opt/strimzi/init";
|
111 | 95 |
|
| 96 | + /** |
| 97 | + * Key under which the bridge configuration is stored in ConfigMap |
| 98 | + */ |
| 99 | + public static final String BRIDGE_CONFIGURATION_FILENAME = "application.properties"; |
| 100 | + |
| 101 | + /** |
| 102 | + * Annotation for rolling the bridge whenever the configuration within the application.properties file is changed. |
| 103 | + * When the configuration hash annotation change is detected, we force a pod restart. |
| 104 | + */ |
| 105 | + public static final String ANNO_STRIMZI_IO_CONFIGURATION_HASH = Annotations.STRIMZI_DOMAIN + "configuration-hash"; |
| 106 | + |
112 | 107 | private int replicas;
|
113 | 108 | private ClientTls tls;
|
114 | 109 | private KafkaClientAuthentication authentication;
|
@@ -411,59 +406,12 @@ protected List<EnvVar> getEnvVars() {
|
411 | 406 | varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));
|
412 | 407 | JvmOptionUtils.javaOptions(varList, jvmOptions);
|
413 | 408 |
|
414 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS, bootstrapServers)); |
415 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG, kafkaBridgeAdminClient == null ? "" : new KafkaBridgeAdminClientConfiguration(reconciliation, kafkaBridgeAdminClient.getConfig().entrySet()).getConfiguration())); |
416 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ID, cluster)); |
417 |
| - |
418 |
| - if (kafkaBridgeConsumer != null) { |
419 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, new KafkaBridgeConsumerConfiguration(reconciliation, kafkaBridgeConsumer.getConfig().entrySet()).getConfiguration())); |
420 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED, String.valueOf(kafkaBridgeConsumer.isEnabled()))); |
421 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(kafkaBridgeConsumer.getTimeoutSeconds()))); |
422 |
| - } else { |
423 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, "")); |
424 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED, "true")); |
425 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(KafkaBridgeConsumerSpec.HTTP_DEFAULT_TIMEOUT))); |
426 |
| - } |
427 |
| - |
428 |
| - if (kafkaBridgeProducer != null) { |
429 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, new KafkaBridgeProducerConfiguration(reconciliation, kafkaBridgeProducer.getConfig().entrySet()).getConfiguration())); |
430 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED, String.valueOf(kafkaBridgeProducer.isEnabled()))); |
431 |
| - } else { |
432 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, "")); |
433 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED, "true")); |
434 |
| - } |
435 |
| - |
436 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_HOST, KafkaBridgeHttpConfig.HTTP_DEFAULT_HOST)); |
437 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PORT, String.valueOf(http != null ? http.getPort() : KafkaBridgeHttpConfig.HTTP_DEFAULT_PORT))); |
438 |
| - |
439 |
| - if (http != null && http.getCors() != null) { |
440 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED, "true")); |
441 |
| - |
442 |
| - if (http.getCors().getAllowedOrigins() != null) { |
443 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS, String.join(",", http.getCors().getAllowedOrigins()))); |
444 |
| - } |
445 |
| - |
446 |
| - if (http.getCors().getAllowedMethods() != null) { |
447 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS, String.join(",", http.getCors().getAllowedMethods()))); |
448 |
| - } |
449 |
| - } else { |
450 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED, "false")); |
451 |
| - } |
452 |
| - |
453 |
| - if (tls != null) { |
454 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TLS, "true")); |
455 |
| - |
456 |
| - if (tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) { |
457 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates()))); |
458 |
| - } |
| 409 | + if (tls != null && tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) { |
| 410 | + varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates()))); |
459 | 411 | }
|
460 | 412 |
|
461 | 413 | AuthenticationUtils.configureClientAuthenticationEnvVars(authentication, varList, name -> ENV_VAR_PREFIX + name);
|
462 | 414 |
|
463 |
| - if (tracing != null) { |
464 |
| - varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_TRACING, tracing.getType())); |
465 |
| - } |
466 |
| - |
467 | 415 | // Add shared environment variables used for all containers
|
468 | 416 | varList.addAll(sharedEnvironmentProvider.variables());
|
469 | 417 |
|
@@ -600,21 +548,38 @@ protected List<EnvVar> getInitContainerEnvVars() {
|
600 | 548 | }
|
601 | 549 |
|
602 | 550 | /**
|
603 |
| - * Generates a metrics and logging ConfigMap according to the configuration. If this operand doesn't support logging |
604 |
| - * or metrics, they will nto be set. |
| 551 | + * Generates a ConfigMap containing the bridge configuration related to HTTP and Kafka clients. |
| 552 | + * It also generates the metrics and logging configuration. If this operand doesn't support logging |
| 553 | + * or metrics, they will not be set. |
605 | 554 | *
|
606 | 555 | * @param metricsAndLogging The external CMs with logging and metrics configuration
|
607 | 556 | *
|
608 | 557 | * @return The generated ConfigMap
|
609 | 558 | */
|
610 |
| - public ConfigMap generateMetricsAndLogConfigMap(MetricsAndLogging metricsAndLogging) { |
| 559 | + public ConfigMap generateBridgeConfigMap(MetricsAndLogging metricsAndLogging) { |
| 560 | + // generate the ConfigMap data entries for the metrics and logging configuration |
| 561 | + Map<String, String> data = ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging); |
| 562 | + // add the ConfigMap data entry for the bridge HTTP and Kafka clients related configuration |
| 563 | + data.put( |
| 564 | + BRIDGE_CONFIGURATION_FILENAME, |
| 565 | + new KafkaBridgeConfigurationBuilder(reconciliation, cluster, bootstrapServers) |
| 566 | + .withTracing(tracing) |
| 567 | + .withTls(tls) |
| 568 | + .withAuthentication(authentication) |
| 569 | + .withKafkaAdminClient(kafkaBridgeAdminClient) |
| 570 | + .withKafkaProducer(kafkaBridgeProducer) |
| 571 | + .withKafkaConsumer(kafkaBridgeConsumer) |
| 572 | + .withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer) |
| 573 | + .build() |
| 574 | + ); |
| 575 | + |
611 | 576 | return ConfigMapUtils
|
612 | 577 | .createConfigMap(
|
613 | 578 | KafkaBridgeResources.metricsAndLogConfigMapName(cluster),
|
614 | 579 | namespace,
|
615 | 580 | labels,
|
616 | 581 | ownerReference,
|
617 |
| - ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging) |
| 582 | + data |
618 | 583 | );
|
619 | 584 | }
|
620 | 585 |
|
|
0 commit comments