Skip to content

Commit f1f86a2

Browse files
feat(protocol): add CommandDistributionRecord implementation
Adds the implementation of the CommandDistributionRecord.
1 parent 818d957 commit f1f86a2

File tree

3 files changed

+191
-1
lines changed

3 files changed

+191
-1
lines changed

broker/src/main/java/io/camunda/zeebe/broker/transport/commandapi/CommandApiRequestReader.java

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
1616
import io.camunda.zeebe.protocol.impl.record.value.decision.DecisionEvaluationRecord;
1717
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
18+
import io.camunda.zeebe.protocol.impl.record.value.distribution.CommandDistributionRecord;
1819
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord;
1920
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
2021
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
@@ -49,6 +50,7 @@ public class CommandApiRequestReader implements RequestReader<ExecuteCommandRequ
4950
RECORDS_BY_TYPE.put(
5051
ValueType.PROCESS_INSTANCE_MODIFICATION, ProcessInstanceModificationRecord::new);
5152
RECORDS_BY_TYPE.put(ValueType.SIGNAL, SignalRecord::new);
53+
RECORDS_BY_TYPE.put(ValueType.COMMAND_DISTRIBUTION, CommandDistributionRecord::new);
5254
}
5355

5456
private UnifiedRecordValue value;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
3+
* one or more contributor license agreements. See the NOTICE file distributed
4+
* with this work for additional information regarding copyright ownership.
5+
* Licensed under the Zeebe Community License 1.1. You may not use this file
6+
* except in compliance with the Zeebe Community License 1.1.
7+
*/
8+
package io.camunda.zeebe.protocol.impl.record.value.distribution;
9+
10+
import io.camunda.zeebe.msgpack.MsgpackException;
11+
import io.camunda.zeebe.msgpack.property.EnumProperty;
12+
import io.camunda.zeebe.msgpack.property.IntegerProperty;
13+
import io.camunda.zeebe.msgpack.property.ObjectProperty;
14+
import io.camunda.zeebe.msgpack.spec.MsgPackReader;
15+
import io.camunda.zeebe.msgpack.spec.MsgPackWriter;
16+
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
17+
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
18+
import io.camunda.zeebe.protocol.record.RecordValue;
19+
import io.camunda.zeebe.protocol.record.ValueType;
20+
import io.camunda.zeebe.protocol.record.value.CommandDistributionRecordValue;
21+
import java.util.EnumMap;
22+
import java.util.Map;
23+
import java.util.function.Supplier;
24+
import org.agrona.concurrent.UnsafeBuffer;
25+
26+
public final class CommandDistributionRecord extends UnifiedRecordValue
27+
implements CommandDistributionRecordValue {
28+
29+
private static final Map<ValueType, Supplier<UnifiedRecordValue>> RECORDS_BY_TYPE =
30+
new EnumMap<>(ValueType.class);
31+
32+
// You'll need to register any of the records value's that you want to distribute
33+
static {
34+
RECORDS_BY_TYPE.put(ValueType.DEPLOYMENT, DeploymentRecord::new);
35+
}
36+
37+
private final IntegerProperty partitionIdProperty = new IntegerProperty("partitionId");
38+
private final EnumProperty<ValueType> valueTypeProperty =
39+
new EnumProperty<>("valueType", ValueType.class);
40+
private final ObjectProperty<UnifiedRecordValue> commandValueProperty =
41+
new ObjectProperty<>("commandValue", new UnifiedRecordValue());
42+
private final MsgPackWriter recordValueWriter = new MsgPackWriter();
43+
private final MsgPackReader recordValueReader = new MsgPackReader();
44+
45+
public CommandDistributionRecord() {
46+
declareProperty(partitionIdProperty)
47+
.declareProperty(valueTypeProperty)
48+
.declareProperty(commandValueProperty);
49+
}
50+
51+
@Override
52+
public int getPartitionId() {
53+
return partitionIdProperty.getValue();
54+
}
55+
56+
@Override
57+
public ValueType getValueType() {
58+
return valueTypeProperty.getValue();
59+
}
60+
61+
@Override
62+
public RecordValue getCommandValue() {
63+
// fetch a concrete instance of the record value by type
64+
if (!valueTypeProperty.hasValue()) {
65+
throw new MsgpackException("Expected to read the value type property, but it's not yet set");
66+
}
67+
final var valueType = getValueType();
68+
final var concrecteRecordValueSupplier = RECORDS_BY_TYPE.get(valueType);
69+
if (concrecteRecordValueSupplier == null) {
70+
throw new IllegalStateException(
71+
"Expected to read the record value, but it's type `"
72+
+ valueType.name()
73+
+ "` is unknown. Please add it to RecordDistributionRecord.RECORDS_BY_TYPE");
74+
}
75+
final var concreteRecordValue = concrecteRecordValueSupplier.get();
76+
77+
// write the record value property's content into a buffer
78+
final var storedRecordValue = commandValueProperty.getValue();
79+
final var recordValueBuffer = new UnsafeBuffer(0, 0);
80+
final int encodedLength = storedRecordValue.getEncodedLength();
81+
recordValueBuffer.wrap(new byte[encodedLength]);
82+
storedRecordValue.write(recordValueWriter.wrap(recordValueBuffer, 0));
83+
84+
// read the value back from the buffer into the concrete record value
85+
concreteRecordValue.wrap(recordValueBuffer);
86+
return concreteRecordValue;
87+
}
88+
89+
public CommandDistributionRecord setValueType(final ValueType valueType) {
90+
valueTypeProperty.setValue(valueType);
91+
return this;
92+
}
93+
94+
public CommandDistributionRecord setPartitionId(final int partitionId) {
95+
partitionIdProperty.setValue(partitionId);
96+
return this;
97+
}
98+
99+
public CommandDistributionRecord setRecordValue(final UnifiedRecordValue recordValue) {
100+
// inspired by IndexedRecord.setValue
101+
final var valueBuffer = new UnsafeBuffer(0, 0);
102+
final int encodedLength = recordValue.getLength();
103+
valueBuffer.wrap(new byte[encodedLength]);
104+
105+
recordValue.write(valueBuffer, 0);
106+
commandValueProperty.getValue().read(recordValueReader.wrap(valueBuffer, 0, encodedLength));
107+
return this;
108+
}
109+
}

protocol-impl/src/test/java/io/camunda/zeebe/protocol/impl/JsonSerializableToJsonTest.java

+80-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentDistributionRecord;
2424
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
2525
import io.camunda.zeebe.protocol.impl.record.value.deployment.ProcessRecord;
26+
import io.camunda.zeebe.protocol.impl.record.value.distribution.CommandDistributionRecord;
2627
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
2728
import io.camunda.zeebe.protocol.impl.record.value.escalation.EscalationRecord;
2829
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord;
@@ -1188,7 +1189,85 @@ final var record =
11881189
"resourceKey":1
11891190
}
11901191
"""
1191-
}
1192+
},
1193+
1194+
/////////////////////////////////////////////////////////////////////////////////////////////
1195+
///////////////////////////////// CommandDistributionRecord //////////////////////////////////
1196+
/////////////////////////////////////////////////////////////////////////////////////////////
1197+
{
1198+
"CommandDistributionRecord",
1199+
(Supplier<UnifiedRecordValue>)
1200+
() -> {
1201+
final var deploymentRecord = new DeploymentRecord();
1202+
deploymentRecord
1203+
.resources()
1204+
.add()
1205+
.setResourceName("my_first_bpmn.bpmn")
1206+
.setResource(wrapString("This is the contents of the BPMN"));
1207+
deploymentRecord
1208+
.processesMetadata()
1209+
.add()
1210+
.setKey(123)
1211+
.setVersion(1)
1212+
.setBpmnProcessId("my_first_process")
1213+
.setResourceName("my_first_bpmn.bpmn")
1214+
.setChecksum(wrapString("sha1"));
1215+
1216+
return new CommandDistributionRecord()
1217+
.setPartitionId(1)
1218+
.setValueType(ValueType.DEPLOYMENT)
1219+
.setRecordValue(deploymentRecord);
1220+
},
1221+
"""
1222+
{
1223+
"partitionId": 1,
1224+
"valueType": "DEPLOYMENT",
1225+
"commandValue": {
1226+
"resources": [{
1227+
"resource": "VGhpcyBpcyB0aGUgY29udGVudHMgb2YgdGhlIEJQTU4=",
1228+
"resourceName": "my_first_bpmn.bpmn"
1229+
}],
1230+
"processesMetadata": [{
1231+
"processDefinitionKey": 123,
1232+
"version": 1,
1233+
"bpmnProcessId": "my_first_process",
1234+
"resourceName": "my_first_bpmn.bpmn",
1235+
"checksum": "c2hhMQ==",
1236+
"duplicate": false
1237+
}],
1238+
"decisionsMetadata": [],
1239+
"decisionRequirementsMetadata": []
1240+
}
1241+
}
1242+
"""
1243+
},
1244+
1245+
/////////////////////////////////////////////////////////////////////////////////////////////
1246+
///////////////////////////////// Empty CommandDistributionRecord ////////////////////////////
1247+
/////////////////////////////////////////////////////////////////////////////////////////////
1248+
{
1249+
"Empty CommandDistributionRecord",
1250+
(Supplier<UnifiedRecordValue>)
1251+
() -> {
1252+
final var deploymentRecord = new DeploymentRecord();
1253+
return new CommandDistributionRecord()
1254+
.setPartitionId(1)
1255+
.setValueType(ValueType.DEPLOYMENT)
1256+
.setRecordValue(deploymentRecord);
1257+
},
1258+
"""
1259+
{
1260+
"partitionId": 1,
1261+
"valueType": "DEPLOYMENT",
1262+
"commandValue": {
1263+
"resources": [],
1264+
"processesMetadata": [],
1265+
"decisionsMetadata": [],
1266+
"decisionRequirementsMetadata": []
1267+
}
1268+
}
1269+
"""
1270+
},
11921271
};
11931272
}
11941273

0 commit comments

Comments
 (0)