Skip to content

Commit 0be02c9

Browse files
committed
[Fix #3721] Refactoring KogitoIndexConverter
1 parent 5d36770 commit 0be02c9

11 files changed

+232
-33
lines changed

api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,9 @@ public static void writeInteger(DataOutput out, Integer integer) throws IOExcept
237237
public static Integer readInteger(DataInput in) throws IOException {
238238
SerType type = readType(in);
239239
return type == SerType.NULL ? null : readInt(in, type);
240-
241240
}
242241

243-
private static void writeInt(DataOutput out, int size) throws IOException {
242+
public static void writeInt(DataOutput out, int size) throws IOException {
244243
if (size < Byte.MAX_VALUE) {
245244
writeType(out, SerType.BYTE);
246245
out.writeByte((byte) size);
@@ -253,7 +252,7 @@ private static void writeInt(DataOutput out, int size) throws IOException {
253252
}
254253
}
255254

256-
private static int readInt(DataInput in) throws IOException {
255+
public static int readInt(DataInput in) throws IOException {
257256
SerType type = readType(in);
258257
return readInt(in, type);
259258
}

api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.kie.kogito.event;
2020

21+
import java.io.IOException;
2122
import java.net.URI;
2223
import java.time.OffsetDateTime;
2324
import java.util.Optional;
@@ -43,6 +44,22 @@ public static <T> DataEvent<T> from(CloudEvent event, Converter<CloudEventData,
4344
return new CloudEventWrapDataEvent<>(event, dataUnmarshaller);
4445
}
4546

47+
public static <T extends AbstractDataEvent<V>, V> T from(T dataEvent, CloudEvent cloudEvent, Converter<CloudEventData, V> dataUnmarshaller) throws IOException {
48+
dataEvent.setSpecVersion(cloudEvent.getSpecVersion());
49+
dataEvent.setId(cloudEvent.getId());
50+
dataEvent.setType(cloudEvent.getType());
51+
dataEvent.setSource(cloudEvent.getSource());
52+
dataEvent.setDataContentType(cloudEvent.getDataContentType());
53+
dataEvent.setDataSchema(cloudEvent.getDataSchema());
54+
dataEvent.setSubject(cloudEvent.getSubject());
55+
dataEvent.setTime(cloudEvent.getTime());
56+
cloudEvent.getExtensionNames().forEach(extensionName -> dataEvent.addExtensionAttribute(extensionName, cloudEvent.getExtension(extensionName)));
57+
if (cloudEvent.getData() != null) {
58+
dataEvent.setData(dataUnmarshaller.convert(cloudEvent.getData()));
59+
}
60+
return dataEvent;
61+
}
62+
4663
public static <T> DataEvent<T> from(T eventData, String trigger, KogitoProcessInstance pi) {
4764
return from(eventData, trigger, URI.create("/process/" + pi.getProcessId()), Optional.empty(), ProcessMeta.fromKogitoProcessInstance(pi));
4865
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.kie.kogito.event.impl;
20+
21+
import java.io.IOException;
22+
23+
import org.kie.kogito.event.Converter;
24+
25+
import com.fasterxml.jackson.core.type.TypeReference;
26+
import com.fasterxml.jackson.databind.ObjectMapper;
27+
28+
import io.cloudevents.CloudEventData;
29+
30+
public class JacksonTypeCloudEventDataConverter<O> implements Converter<CloudEventData, O> {
31+
32+
private ObjectMapper objectMapper;
33+
private TypeReference<O> outputType;
34+
35+
public JacksonTypeCloudEventDataConverter(ObjectMapper objectMapper, TypeReference<O> outputType) {
36+
this.objectMapper = objectMapper;
37+
this.outputType = outputType;
38+
}
39+
40+
@Override
41+
public O convert(CloudEventData value) throws IOException {
42+
return objectMapper.readValue(value.toBytes(), outputType);
43+
}
44+
}

api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,4 @@ public boolean isCompressed() {
4242
public void setCompressed(boolean compressed) {
4343
addExtensionAttribute(COMPRESS_DATA, compressed);
4444
}
45-
4645
}

api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;
3333

34-
public class KogitoDataEventSerializationHelper {
34+
class KogitoDataEventSerializationHelper {
3535

3636
private KogitoDataEventSerializationHelper() {
3737
}
@@ -49,7 +49,10 @@ static <T extends AbstractDataEvent<?>> T readCloudEventAttrs(DataInput in, T da
4949
data.setId(in.readUTF());
5050
data.setSubject(readUTF(in));
5151
data.setDataContentType(readUTF(in));
52-
data.setDataSchema(URI.create(readUTF(in)));
52+
String dataSchema = readUTF(in);
53+
if (dataSchema != null) {
54+
data.setDataSchema(URI.create(dataSchema));
55+
}
5356
return data;
5457
}
5558

api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
public class MultipleProcessDataInstanceBeanDeserializerModifier extends BeanDeserializerModifier {
2929

30+
private static final long serialVersionUID = 1L;
31+
3032
@Override
3133
public JsonDeserializer<?> modifyDeserializer(
3234
DeserializationConfig config, BeanDescription beanDesc, JsonDeserializer<?> deserializer) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.kie.kogito.event.serializer;
20+
21+
import java.io.IOException;
22+
import java.util.Base64;
23+
import java.util.Collection;
24+
25+
import org.kie.kogito.event.Converter;
26+
import org.kie.kogito.event.impl.JacksonTypeCloudEventDataConverter;
27+
import org.kie.kogito.event.process.KogitoMarshallEventSupport;
28+
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
29+
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
30+
31+
import com.fasterxml.jackson.core.type.TypeReference;
32+
import com.fasterxml.jackson.databind.ObjectMapper;
33+
34+
import io.cloudevents.CloudEvent;
35+
import io.cloudevents.CloudEventData;
36+
37+
public class MultipleProcessDataInstanceConverterFactory {
38+
39+
private MultipleProcessDataInstanceConverterFactory() {
40+
}
41+
42+
public static Converter<CloudEventData, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> fromCloudEvent(CloudEvent cloudEvent, ObjectMapper objectMapper) {
43+
if (MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(cloudEvent.getDataContentType())) {
44+
return isCompressed(cloudEvent) ? compressedConverter : binaryConverter;
45+
} else {
46+
return new JacksonTypeCloudEventDataConverter<>(objectMapper, new TypeReference<Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>>() {
47+
});
48+
}
49+
}
50+
51+
private static boolean isCompressed(CloudEvent event) {
52+
Object value = event.getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA);
53+
return value instanceof Boolean ? ((Boolean) value).booleanValue() : false;
54+
}
55+
56+
private static Converter<CloudEventData, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> binaryConverter =
57+
data -> deserialize(data, false);
58+
59+
private static Converter<CloudEventData, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> compressedConverter =
60+
data -> deserialize(data, true);
61+
62+
private static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> deserialize(CloudEventData data, boolean compress) throws IOException {
63+
return MultipleProcessInstanceDataEventDeserializer.readFromBytes(Base64.getDecoder().decode(data.toBytes()), compress);
64+
}
65+
}

api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import java.util.ArrayList;
2929
import java.util.Collection;
3030
import java.util.List;
31+
import java.util.function.Supplier;
3132
import java.util.zip.GZIPInputStream;
3233

3334
import org.kie.kogito.event.process.CloudEventVisitor;
34-
import org.kie.kogito.event.process.KogitoEventBodySerializationHelper;
3535
import org.kie.kogito.event.process.KogitoMarshallEventSupport;
3636
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
3737
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
@@ -45,6 +45,8 @@
4545
import org.kie.kogito.event.process.ProcessInstanceStateEventBody;
4646
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
4747
import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
48+
import org.slf4j.Logger;
49+
import org.slf4j.LoggerFactory;
4850

4951
import com.fasterxml.jackson.core.JacksonException;
5052
import com.fasterxml.jackson.core.JsonParser;
@@ -56,8 +58,12 @@
5658

5759
import io.cloudevents.SpecVersion;
5860

61+
import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readInt;
62+
5963
public class MultipleProcessInstanceDataEventDeserializer extends JsonDeserializer<MultipleProcessInstanceDataEvent> implements ResolvableDeserializer {
6064

65+
private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);
66+
6167
private JsonDeserializer<Object> defaultDeserializer;
6268

6369
public MultipleProcessInstanceDataEventDeserializer(JsonDeserializer<Object> deserializer) {
@@ -98,27 +104,34 @@ private static boolean isCompressed(JsonNode node) {
98104
return compress != null && compress.isBoolean() ? compress.asBoolean() : false;
99105
}
100106

101-
public static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> readFromBytes(byte[] binaryValue, boolean compressed) throws IOException {
107+
static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> readFromBytes(byte[] binaryValue, boolean compressed) throws IOException {
102108
InputStream wrappedIn = new ByteArrayInputStream(binaryValue);
103109
if (compressed) {
110+
logger.trace("Gzip compressed byte array");
104111
wrappedIn = new GZIPInputStream(wrappedIn);
105112
}
106113
try (DataInputStream in = new DataInputStream(wrappedIn)) {
107-
int size = in.readShort();
114+
int size = readInt(in);
115+
logger.trace("Reading collection of size {}", size);
108116
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> result = new ArrayList<>(size);
109117
List<ProcessInstanceDataEventExtensionRecord> infos = new ArrayList<>();
110118
while (size-- > 0) {
111119
byte readInfo = in.readByte();
120+
logger.trace("Info ordinal is {}", readInfo);
112121
ProcessInstanceDataEventExtensionRecord info;
113122
if (readInfo == -1) {
114123
info = new ProcessInstanceDataEventExtensionRecord();
115124
info.readEvent(in);
125+
logger.trace("Info readed is {}", info);
116126
infos.add(info);
117127
} else {
118128
info = infos.get(readInfo);
129+
logger.trace("Info cached is {}", info);
119130
}
120131
String type = in.readUTF();
132+
logger.trace("Type is {}", info);
121133
result.add(getCloudEvent(in, type, info));
134+
logger.trace("{} events remaining", size);
122135
}
123136
return result;
124137
}
@@ -127,31 +140,44 @@ public static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventS
127140
private static ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport> getCloudEvent(DataInputStream in, String type, ProcessInstanceDataEventExtensionRecord info) throws IOException {
128141
switch (type) {
129142
case ProcessInstanceVariableDataEvent.VAR_TYPE:
130-
ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), new ProcessInstanceVariableEventBody(), info);
143+
ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), ProcessInstanceVariableEventBody::new, info);
131144
item.setKogitoVariableName(item.getData().getVariableName());
132145
return item;
133146
case ProcessInstanceStateDataEvent.STATE_TYPE:
134-
return buildDataEvent(in, new ProcessInstanceStateDataEvent(), new ProcessInstanceStateEventBody(), info);
147+
return buildDataEvent(in, new ProcessInstanceStateDataEvent(), ProcessInstanceStateEventBody::new, info);
135148
case ProcessInstanceNodeDataEvent.NODE_TYPE:
136-
return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), new ProcessInstanceNodeEventBody(), info);
149+
return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), ProcessInstanceNodeEventBody::new, info);
137150
case ProcessInstanceErrorDataEvent.ERROR_TYPE:
138-
return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), new ProcessInstanceErrorEventBody(), info);
151+
return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), ProcessInstanceErrorEventBody::new, info);
139152
case ProcessInstanceSLADataEvent.SLA_TYPE:
140-
return buildDataEvent(in, new ProcessInstanceSLADataEvent(), new ProcessInstanceSLAEventBody(), info);
153+
return buildDataEvent(in, new ProcessInstanceSLADataEvent(), ProcessInstanceSLAEventBody::new, info);
141154
default:
142155
throw new UnsupportedOperationException("Unrecognized event type " + type);
143156
}
144157
}
145158

146-
private static <T extends ProcessInstanceDataEvent<V>, V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, V body,
159+
private static <T extends ProcessInstanceDataEvent<V>, V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, Supplier<V> bodySupplier,
147160
ProcessInstanceDataEventExtensionRecord info) throws IOException {
148-
int delta = KogitoEventBodySerializationHelper.readInteger(in);
161+
int delta = readInt(in);
162+
logger.trace("Time delta is {}", delta);
149163
cloudEvent.setTime(info.getTime().plus(delta, ChronoUnit.MILLIS));
150164
KogitoDataEventSerializationHelper.readCloudEventAttrs(in, cloudEvent);
165+
logger.trace("Cloud event before population {}", cloudEvent);
151166
KogitoDataEventSerializationHelper.populateCloudEvent(cloudEvent, info);
152-
body.readEvent(in);
153-
body.visit(cloudEvent);
154-
cloudEvent.setData(body);
167+
logger.trace("Cloud event after population {}", cloudEvent);
168+
169+
boolean isNotNull = in.readBoolean();
170+
if (isNotNull) {
171+
logger.trace("Data is not null");
172+
V body = bodySupplier.get();
173+
body.readEvent(in);
174+
logger.trace("Event body before population {}", body);
175+
body.visit(cloudEvent);
176+
logger.trace("Event body after population {}", body);
177+
cloudEvent.setData(body);
178+
} else {
179+
logger.trace("Data is null");
180+
}
155181
return cloudEvent;
156182
}
157183

api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,22 @@
2626
import java.util.Map;
2727
import java.util.zip.GZIPOutputStream;
2828

29-
import org.kie.kogito.event.process.KogitoEventBodySerializationHelper;
3029
import org.kie.kogito.event.process.KogitoMarshallEventSupport;
3130
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
3231
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
3334

3435
import com.fasterxml.jackson.core.JsonGenerator;
3536
import com.fasterxml.jackson.databind.JsonSerializer;
3637
import com.fasterxml.jackson.databind.SerializerProvider;
3738

39+
import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeInt;
40+
3841
public class MultipleProcessInstanceDataEventSerializer extends JsonSerializer<MultipleProcessInstanceDataEvent> {
3942

43+
private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);
44+
4045
private JsonSerializer<Object> defaultSerializer;
4146

4247
public MultipleProcessInstanceDataEventSerializer(JsonSerializer<Object> serializer) {
@@ -67,23 +72,41 @@ public void serialize(MultipleProcessInstanceDataEvent value, JsonGenerator gen,
6772
private byte[] dataAsBytes(JsonGenerator gen, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> data, boolean compress) throws IOException {
6873
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
6974
try (DataOutputStream out = new DataOutputStream(compress ? new GZIPOutputStream(bytesOut) : bytesOut)) {
70-
out.writeShort(data.size());
75+
logger.trace("Writing size {}", data.size());
76+
writeInt(out, data.size());
7177
Map<String, ProcessInstanceDataEventExtensionRecord> infos = new HashMap<>();
7278
for (ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport> cloudEvent : data) {
7379
String key = cloudEvent.getKogitoProcessInstanceId();
7480
ProcessInstanceDataEventExtensionRecord info = infos.get(key);
7581
if (info == null) {
76-
out.writeByte(-1);
82+
logger.trace("Writing marker byte -1");
83+
out.writeByte((byte) -1);
7784
info = new ProcessInstanceDataEventExtensionRecord(infos.size(), cloudEvent);
85+
logger.trace("Writing info", info);
7886
info.writeEvent(out);
7987
infos.put(key, info);
8088
} else {
89+
logger.trace("Writing marker byte {}", info.getOrdinal());
8190
out.writeByte((byte) info.getOrdinal());
8291
}
92+
logger.trace("Writing type {}", cloudEvent.getType());
8393
out.writeUTF(cloudEvent.getType());
84-
KogitoEventBodySerializationHelper.writeInteger(out, cloudEvent.getTime().compareTo(info.getTime()));
94+
int timeDelta = cloudEvent.getTime().compareTo(info.getTime());
95+
logger.trace("Writing time delta {}", timeDelta);
96+
writeInt(out, timeDelta);
97+
logger.trace("Writing cloud event attrs {}", cloudEvent);
8598
KogitoDataEventSerializationHelper.writeCloudEventAttrs(out, cloudEvent);
86-
cloudEvent.getData().writeEvent(out);
99+
KogitoMarshallEventSupport itemData = cloudEvent.getData();
100+
if (itemData != null) {
101+
logger.trace("Writing data not null boolean");
102+
out.writeBoolean(true);
103+
logger.trace("Writing cloud event body {}", itemData);
104+
itemData.writeEvent(out);
105+
} else {
106+
logger.trace("Writing data null boolean");
107+
out.writeBoolean(false);
108+
}
109+
logger.trace("individual event writing completed");
87110
}
88111
}
89112
return bytesOut.toByteArray();

0 commit comments

Comments
 (0)