Skip to content

feat(exporter): define the Hazelcast cluster name #156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ docker-compose up
In the Zeebe configuration file, you can change

* the Hazelcast port
* the Hazelcast cluster name
* the value and record types which are exported
* the ringbuffer's name
* the ringbuffer's capacity
Expand All @@ -113,8 +114,11 @@ zeebe:
className: io.zeebe.hazelcast.exporter.HazelcastExporter
jarPath: exporters/zeebe-hazelcast-exporter.jar
args:
# Hazelcast port
# Hazelcast port
port = 5701

# Hazelcast cluster name
clusterName = "dev"

# comma separated list of io.zeebe.protocol.record.ValueType to export or empty to export all types
enabledValueTypes = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class ExporterConfiguration {

private String remoteAddress;

private String clusterName = "dev";

private String name = "zeebe";

private int capacity = -1;
Expand Down Expand Up @@ -54,28 +56,34 @@ public Optional<String> getRemoteAddress() {
.filter(remoteAddress -> !remoteAddress.isEmpty());
}

public String getClusterName() {
return getEnv("CLUSTER_NAME").orElse(clusterName);
}

private Optional<String> getEnv(String name) {
return Optional.ofNullable(System.getenv(ENV_PREFIX + name));
}

@Override
public String toString() {
return "[port="
+ port
+ ", remoteAddress="
+ remoteAddress
+ ", name="
+ name
+ ", enabledValueTypes="
+ enabledValueTypes
+ ", enabledRecordTypes="
+ enabledRecordTypes
+ ", capacity="
+ capacity
+ ", timeToLiveInSeconds="
+ timeToLiveInSeconds
+ ", format="
+ format
+ "]";
+ port
+ ", remoteAddress="
+ remoteAddress
+ ", name="
+ name
+ ", clusterName="
+ clusterName
+ ", enabledValueTypes="
+ enabledValueTypes
+ ", enabledRecordTypes="
+ enabledRecordTypes
+ ", capacity="
+ capacity
+ ", timeToLiveInSeconds="
+ timeToLiveInSeconds
+ ", format="
+ format
+ "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,14 @@ public void open(Controller controller) {

private HazelcastInstance createHazelcastInstance() {
final var port = this.config.getPort();
final var clusterName = this.config.getClusterName();

final var hzConfig = new Config();
hzConfig.getNetworkConfig().setPort(port);
hzConfig.setProperty("hazelcast.logging.type", "slf4j");

hzConfig.setClusterName(clusterName);

final var ringbufferConfig = new RingbufferConfig(this.config.getName());

if (this.config.getCapacity() > 0) {
Expand All @@ -100,20 +103,29 @@ private HazelcastInstance createHazelcastInstance() {

hzConfig.addRingBufferConfig(ringbufferConfig);

logger.info("Creating new in-memory Hazelcast instance [port: {}]", port);
logger.info(
"Creating new in-memory Hazelcast instance [port: {}, cluster-name: {}]",
port,
clusterName);

return Hazelcast.newHazelcastInstance(hzConfig);
}

private HazelcastInstance connectToHazelcast(String remoteAddress) {
final var clusterName = this.config.getClusterName();

final var clientConfig = new ClientConfig();
clientConfig.setProperty("hazelcast.logging.type", "slf4j");

final var networkConfig = clientConfig.getNetworkConfig();
networkConfig.addAddress(remoteAddress);

logger.info("Connecting to remote Hazelcast instance [address: {}]", remoteAddress);
clientConfig.setClusterName(clusterName);

logger.info(
"Connecting to remote Hazelcast instance [address: {}, cluster-name: {}]",
remoteAddress,
clusterName);

return HazelcastClient.newHazelcastClient(clientConfig);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package io.zeebe.hazelcast;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.ringbuffer.Ringbuffer;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.test.ZeebeTestRule;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.hazelcast.exporter.ExporterConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;

public class ExporterClusterNameTest {

private static final BpmnModelInstance WORKFLOW =
Bpmn.createExecutableProcess("process")
.startEvent("start")
.sequenceFlowId("to-task")
.serviceTask("task", s -> s.zeebeJobType("test"))
.sequenceFlowId("to-end")
.endEvent("end")
.done();

private static final ExporterConfiguration CONFIGURATION = new ExporterConfiguration();

@Rule
public final ZeebeTestRule testRule =
new ZeebeTestRule("application-cluster-name.yaml", Properties::new);

private ZeebeClient client;
private HazelcastInstance hz;

@Before
public void init() {
client = testRule.getClient();

final ClientConfig clientConfig = new ClientConfig();
clientConfig.getNetworkConfig().addAddress("127.0.0.1:5701");
clientConfig.setClusterName("test");
hz = HazelcastClient.newHazelcastClient(clientConfig);
}

@After
public void cleanUp() {
hz.shutdown();
}

@Test
public void shouldExportEventsAsProtobuf() throws Exception {
// given
final Ringbuffer<byte[]> buffer = hz.getRingbuffer(CONFIGURATION.getName());

var sequence = buffer.headSequence();

// when
client.newDeployCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join();

// then
final var message = buffer.readOne(sequence);
assertThat(message).isNotNull();

final var record = Schema.Record.parseFrom(message);
assertThat(record.getRecord().is(Schema.DeploymentRecord.class)).isTrue();

final var deploymentRecord = record.getRecord().unpack(Schema.DeploymentRecord.class);
final Schema.DeploymentRecord.Resource resource = deploymentRecord.getResources(0);
assertThat(resource.getResourceName()).isEqualTo("process.bpmn");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.zeebe.hazelcast;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.ringbuffer.Ringbuffer;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.test.ZeebeTestRule;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.hazelcast.exporter.ExporterConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;

public class RemoteExporterClusterNameTest {

private static final BpmnModelInstance WORKFLOW =
Bpmn.createExecutableProcess("process")
.startEvent("start")
.sequenceFlowId("to-task")
.serviceTask("task", s -> s.zeebeJobType("test"))
.sequenceFlowId("to-end")
.endEvent("end")
.done();

private static final ExporterConfiguration CONFIGURATION = new ExporterConfiguration();

@Rule
public final ZeebeTestRule testRule =
new ZeebeTestRule("application-remote-cluster-name.yaml", Properties::new);

private ZeebeClient client;
private HazelcastInstance hzInstance;
private HazelcastInstance hzClient;

@Before
public void init() {
client = testRule.getClient();

final Config config = new Config();
config.getNetworkConfig().setPort(5702);
config.setClusterName("test");
hzInstance = Hazelcast.newHazelcastInstance(config);

final ClientConfig clientConfig = new ClientConfig();
clientConfig.getNetworkConfig().addAddress("127.0.0.1:5702");
clientConfig.setClusterName("test");
hzClient = HazelcastClient.newHazelcastClient(clientConfig);
}

@After
public void cleanUp() {
hzClient.shutdown();
hzInstance.shutdown();
}

@Test
public void shouldExportEventsAsProtobuf() throws Exception {
// given
final Ringbuffer<byte[]> buffer = hzClient.getRingbuffer(CONFIGURATION.getName());

var sequence = buffer.headSequence();

// when
client.newDeployCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join();

// then
final var message = buffer.readOne(sequence);
assertThat(message).isNotNull();

final var record = Schema.Record.parseFrom(message);
assertThat(record.getRecord().is(Schema.DeploymentRecord.class)).isTrue();

final var deploymentRecord = record.getRecord().unpack(Schema.DeploymentRecord.class);
final Schema.DeploymentRecord.Resource resource = deploymentRecord.getResources(0);
assertThat(resource.getResourceName()).isEqualTo("process.bpmn");
}
}
7 changes: 7 additions & 0 deletions exporter/src/test/resources/application-cluster-name.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
zeebe:
broker:
exporters:
hazelcast:
className: io.zeebe.hazelcast.exporter.HazelcastExporter
args:
clusterName: test
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
zeebe:
broker:
exporters:
hazelcast:
className: io.zeebe.hazelcast.exporter.HazelcastExporter
args:
remoteAddress: 127.0.0.1:5702
clusterName: test