diff --git a/README.md b/README.md index 3cfc7c9..5fd1f83 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,7 @@ docker-compose up In the Zeebe configuration file, you can change * the Hazelcast port +* the Hazelcast cluster name * the value and record types which are exported * the ringbuffer's name * the ringbuffer's capacity @@ -113,8 +114,11 @@ zeebe: className: io.zeebe.hazelcast.exporter.HazelcastExporter jarPath: exporters/zeebe-hazelcast-exporter.jar args: - # Hazelcast port + # Hazelcast port port = 5701 + + # Hazelcast cluster name + clusterName = "dev" # comma separated list of io.zeebe.protocol.record.ValueType to export or empty to export all types enabledValueTypes = "" diff --git a/exporter/src/main/java/io/zeebe/hazelcast/exporter/ExporterConfiguration.java b/exporter/src/main/java/io/zeebe/hazelcast/exporter/ExporterConfiguration.java index 65e3121..a12d895 100644 --- a/exporter/src/main/java/io/zeebe/hazelcast/exporter/ExporterConfiguration.java +++ b/exporter/src/main/java/io/zeebe/hazelcast/exporter/ExporterConfiguration.java @@ -10,6 +10,8 @@ public class ExporterConfiguration { private String remoteAddress; + private String clusterName = "dev"; + private String name = "zeebe"; private int capacity = -1; @@ -54,6 +56,10 @@ public Optional getRemoteAddress() { .filter(remoteAddress -> !remoteAddress.isEmpty()); } + public String getClusterName() { + return getEnv("CLUSTER_NAME").orElse(clusterName); + } + private Optional getEnv(String name) { return Optional.ofNullable(System.getenv(ENV_PREFIX + name)); } @@ -61,21 +67,23 @@ private Optional getEnv(String name) { @Override public String toString() { return "[port=" - + port - + ", remoteAddress=" - + remoteAddress - + ", name=" - + name - + ", enabledValueTypes=" - + enabledValueTypes - + ", enabledRecordTypes=" - + enabledRecordTypes - + ", capacity=" - + capacity - + ", timeToLiveInSeconds=" - + timeToLiveInSeconds - + ", format=" - + format - + "]"; + + port + + ", remoteAddress=" + + remoteAddress + + ", name=" + + name + + ", clusterName=" + + clusterName + + ", enabledValueTypes=" + + enabledValueTypes + + ", enabledRecordTypes=" + + enabledRecordTypes + + ", capacity=" + + capacity + + ", timeToLiveInSeconds=" + + timeToLiveInSeconds + + ", format=" + + format + + "]"; } } diff --git a/exporter/src/main/java/io/zeebe/hazelcast/exporter/HazelcastExporter.java b/exporter/src/main/java/io/zeebe/hazelcast/exporter/HazelcastExporter.java index 4b6842c..e659e88 100644 --- a/exporter/src/main/java/io/zeebe/hazelcast/exporter/HazelcastExporter.java +++ b/exporter/src/main/java/io/zeebe/hazelcast/exporter/HazelcastExporter.java @@ -84,11 +84,14 @@ public void open(Controller controller) { private HazelcastInstance createHazelcastInstance() { final var port = this.config.getPort(); + final var clusterName = this.config.getClusterName(); final var hzConfig = new Config(); hzConfig.getNetworkConfig().setPort(port); hzConfig.setProperty("hazelcast.logging.type", "slf4j"); + hzConfig.setClusterName(clusterName); + final var ringbufferConfig = new RingbufferConfig(this.config.getName()); if (this.config.getCapacity() > 0) { @@ -100,12 +103,16 @@ private HazelcastInstance createHazelcastInstance() { hzConfig.addRingBufferConfig(ringbufferConfig); - logger.info("Creating new in-memory Hazelcast instance [port: {}]", port); + logger.info( + "Creating new in-memory Hazelcast instance [port: {}, cluster-name: {}]", + port, + clusterName); return Hazelcast.newHazelcastInstance(hzConfig); } private HazelcastInstance connectToHazelcast(String remoteAddress) { + final var clusterName = this.config.getClusterName(); final var clientConfig = new ClientConfig(); clientConfig.setProperty("hazelcast.logging.type", "slf4j"); @@ -113,7 +120,12 @@ private HazelcastInstance connectToHazelcast(String remoteAddress) { final var networkConfig = clientConfig.getNetworkConfig(); networkConfig.addAddress(remoteAddress); - logger.info("Connecting to remote Hazelcast instance [address: {}]", remoteAddress); + clientConfig.setClusterName(clusterName); + + logger.info( + "Connecting to remote Hazelcast instance [address: {}, cluster-name: {}]", + remoteAddress, + clusterName); return HazelcastClient.newHazelcastClient(clientConfig); } diff --git a/exporter/src/test/java/io/zeebe/hazelcast/ExporterClusterNameTest.java b/exporter/src/test/java/io/zeebe/hazelcast/ExporterClusterNameTest.java new file mode 100644 index 0000000..fa595f8 --- /dev/null +++ b/exporter/src/test/java/io/zeebe/hazelcast/ExporterClusterNameTest.java @@ -0,0 +1,78 @@ +package io.zeebe.hazelcast; + +import com.hazelcast.client.HazelcastClient; +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.ringbuffer.Ringbuffer; +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.model.bpmn.BpmnModelInstance; +import io.camunda.zeebe.test.ZeebeTestRule; +import io.zeebe.exporter.proto.Schema; +import io.zeebe.hazelcast.exporter.ExporterConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ExporterClusterNameTest { + + private static final BpmnModelInstance WORKFLOW = + Bpmn.createExecutableProcess("process") + .startEvent("start") + .sequenceFlowId("to-task") + .serviceTask("task", s -> s.zeebeJobType("test")) + .sequenceFlowId("to-end") + .endEvent("end") + .done(); + + private static final ExporterConfiguration CONFIGURATION = new ExporterConfiguration(); + + @Rule + public final ZeebeTestRule testRule = + new ZeebeTestRule("application-cluster-name.yaml", Properties::new); + + private ZeebeClient client; + private HazelcastInstance hz; + + @Before + public void init() { + client = testRule.getClient(); + + final ClientConfig clientConfig = new ClientConfig(); + clientConfig.getNetworkConfig().addAddress("127.0.0.1:5701"); + clientConfig.setClusterName("test"); + hz = HazelcastClient.newHazelcastClient(clientConfig); + } + + @After + public void cleanUp() { + hz.shutdown(); + } + + @Test + public void shouldExportEventsAsProtobuf() throws Exception { + // given + final Ringbuffer buffer = hz.getRingbuffer(CONFIGURATION.getName()); + + var sequence = buffer.headSequence(); + + // when + client.newDeployCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join(); + + // then + final var message = buffer.readOne(sequence); + assertThat(message).isNotNull(); + + final var record = Schema.Record.parseFrom(message); + assertThat(record.getRecord().is(Schema.DeploymentRecord.class)).isTrue(); + + final var deploymentRecord = record.getRecord().unpack(Schema.DeploymentRecord.class); + final Schema.DeploymentRecord.Resource resource = deploymentRecord.getResources(0); + assertThat(resource.getResourceName()).isEqualTo("process.bpmn"); + } +} diff --git a/exporter/src/test/java/io/zeebe/hazelcast/RemoteExporterClusterNameTest.java b/exporter/src/test/java/io/zeebe/hazelcast/RemoteExporterClusterNameTest.java new file mode 100644 index 0000000..fb8fa74 --- /dev/null +++ b/exporter/src/test/java/io/zeebe/hazelcast/RemoteExporterClusterNameTest.java @@ -0,0 +1,87 @@ +package io.zeebe.hazelcast; + +import com.hazelcast.client.HazelcastClient; +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.config.Config; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.ringbuffer.Ringbuffer; +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.model.bpmn.BpmnModelInstance; +import io.camunda.zeebe.test.ZeebeTestRule; +import io.zeebe.exporter.proto.Schema; +import io.zeebe.hazelcast.exporter.ExporterConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RemoteExporterClusterNameTest { + + private static final BpmnModelInstance WORKFLOW = + Bpmn.createExecutableProcess("process") + .startEvent("start") + .sequenceFlowId("to-task") + .serviceTask("task", s -> s.zeebeJobType("test")) + .sequenceFlowId("to-end") + .endEvent("end") + .done(); + + private static final ExporterConfiguration CONFIGURATION = new ExporterConfiguration(); + + @Rule + public final ZeebeTestRule testRule = + new ZeebeTestRule("application-remote-cluster-name.yaml", Properties::new); + + private ZeebeClient client; + private HazelcastInstance hzInstance; + private HazelcastInstance hzClient; + + @Before + public void init() { + client = testRule.getClient(); + + final Config config = new Config(); + config.getNetworkConfig().setPort(5702); + config.setClusterName("test"); + hzInstance = Hazelcast.newHazelcastInstance(config); + + final ClientConfig clientConfig = new ClientConfig(); + clientConfig.getNetworkConfig().addAddress("127.0.0.1:5702"); + clientConfig.setClusterName("test"); + hzClient = HazelcastClient.newHazelcastClient(clientConfig); + } + + @After + public void cleanUp() { + hzClient.shutdown(); + hzInstance.shutdown(); + } + + @Test + public void shouldExportEventsAsProtobuf() throws Exception { + // given + final Ringbuffer buffer = hzClient.getRingbuffer(CONFIGURATION.getName()); + + var sequence = buffer.headSequence(); + + // when + client.newDeployCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join(); + + // then + final var message = buffer.readOne(sequence); + assertThat(message).isNotNull(); + + final var record = Schema.Record.parseFrom(message); + assertThat(record.getRecord().is(Schema.DeploymentRecord.class)).isTrue(); + + final var deploymentRecord = record.getRecord().unpack(Schema.DeploymentRecord.class); + final Schema.DeploymentRecord.Resource resource = deploymentRecord.getResources(0); + assertThat(resource.getResourceName()).isEqualTo("process.bpmn"); + } +} diff --git a/exporter/src/test/resources/application-cluster-name.yaml b/exporter/src/test/resources/application-cluster-name.yaml new file mode 100644 index 0000000..ee9b45c --- /dev/null +++ b/exporter/src/test/resources/application-cluster-name.yaml @@ -0,0 +1,7 @@ +zeebe: + broker: + exporters: + hazelcast: + className: io.zeebe.hazelcast.exporter.HazelcastExporter + args: + clusterName: test diff --git a/exporter/src/test/resources/application-remote-cluster-name.yaml b/exporter/src/test/resources/application-remote-cluster-name.yaml new file mode 100644 index 0000000..da12aea --- /dev/null +++ b/exporter/src/test/resources/application-remote-cluster-name.yaml @@ -0,0 +1,8 @@ +zeebe: + broker: + exporters: + hazelcast: + className: io.zeebe.hazelcast.exporter.HazelcastExporter + args: + remoteAddress: 127.0.0.1:5702 + clusterName: test