Skip to content

Commit 50dcd6c

Browse files
committed
[Fix #3721] Optimize event grouping
1 parent ac28d72 commit 50dcd6c

28 files changed

+1534
-41
lines changed

api/kogito-events-api/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,6 @@
4747
</dependency>
4848

4949
<!-- CloudEvents -->
50-
<dependency>
51-
<groupId>io.cloudevents</groupId>
52-
<artifactId>cloudevents-core</artifactId>
53-
</dependency>
5450
<!-- Tests -->
5551
<dependency>
5652
<groupId>org.junit.jupiter</groupId>
@@ -72,6 +68,10 @@
7268
<artifactId>slf4j-simple</artifactId>
7369
<scope>test</scope>
7470
</dependency>
71+
<dependency>
72+
<groupId>org.kie.kogito</groupId>
73+
<artifactId>kogito-jackson-utils</artifactId>
74+
</dependency>
7575
</dependencies>
7676

7777
</project>
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.kie.kogito.event.process;
20+
21+
import org.kie.kogito.event.DataEvent;
22+
23+
public interface CloudEventVisitor {
24+
void visit(DataEvent<?> cloudEvent);
25+
}
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.kie.kogito.event.process;
20+
21+
import java.io.DataInput;
22+
import java.io.DataOutput;
23+
import java.io.IOException;
24+
import java.time.Instant;
25+
import java.time.OffsetDateTime;
26+
import java.time.ZoneOffset;
27+
import java.util.Collection;
28+
import java.util.Date;
29+
30+
import org.kie.kogito.jackson.utils.ObjectMapperFactory;
31+
32+
import com.fasterxml.jackson.databind.JsonNode;
33+
34+
public class KogitoEventBodySerializationHelper {
35+
36+
private KogitoEventBodySerializationHelper() {
37+
}
38+
39+
public static String readUTF(DataInput in) throws IOException {
40+
boolean isNotNull = in.readBoolean();
41+
return isNotNull ? in.readUTF() : null;
42+
}
43+
44+
public static void writeUTF(DataOutput out, String string) throws IOException {
45+
if (string == null) {
46+
out.writeBoolean(false);
47+
} else {
48+
out.writeBoolean(true);
49+
out.writeUTF(string);
50+
}
51+
}
52+
53+
public static void writeDate(DataOutput out, Date date) throws IOException {
54+
if (date == null) {
55+
out.writeBoolean(false);
56+
} else {
57+
out.writeBoolean(true);
58+
out.writeLong(date.getTime());
59+
}
60+
}
61+
62+
public static Date readDate(DataInput in) throws IOException {
63+
boolean isNotNull = in.readBoolean();
64+
return isNotNull ? new Date(in.readLong()) : null;
65+
}
66+
67+
public static void writeTime(DataOutput out, OffsetDateTime date) throws IOException {
68+
if (date == null) {
69+
out.writeBoolean(false);
70+
} else {
71+
out.writeBoolean(true);
72+
out.writeLong(date.toInstant().toEpochMilli());
73+
}
74+
}
75+
76+
public static OffsetDateTime readTime(DataInput in) throws IOException {
77+
boolean isNotNull = in.readBoolean();
78+
return isNotNull ? Instant.ofEpochMilli(in.readLong()).atOffset(ZoneOffset.UTC) : null;
79+
}
80+
81+
public static void writeUTFCollection(DataOutput out, Collection<String> collection) throws IOException {
82+
if (collection == null) {
83+
writeInt(out, -1);
84+
} else {
85+
writeInt(out, collection.size());
86+
for (String item : collection) {
87+
writeUTF(out, item);
88+
}
89+
}
90+
}
91+
92+
public static <T extends Collection<String>> T readUTFCollection(DataInput in, T holder) throws IOException {
93+
int size = readInt(in);
94+
if (size == -1) {
95+
return null;
96+
}
97+
while (size-- > 0) {
98+
holder.add(readUTF(in));
99+
}
100+
return holder;
101+
}
102+
103+
private enum SerType {
104+
105+
NULL(KogitoEventBodySerializationHelper::writeNull, KogitoEventBodySerializationHelper::readNull),
106+
JSON(KogitoEventBodySerializationHelper::writeJson, KogitoEventBodySerializationHelper::readJson),
107+
DEFAULT(KogitoEventBodySerializationHelper::writeJson, KogitoEventBodySerializationHelper::readDefault),
108+
STRING(KogitoEventBodySerializationHelper::writeString, DataInput::readUTF),
109+
INT(KogitoEventBodySerializationHelper::writeInt, DataInput::readInt),
110+
SHORT(KogitoEventBodySerializationHelper::writeShort, DataInput::readShort),
111+
LONG(KogitoEventBodySerializationHelper::writeLong, DataInput::readLong),
112+
BYTE(KogitoEventBodySerializationHelper::writeByte, DataInput::readByte),
113+
BOOLEAN(KogitoEventBodySerializationHelper::writeBoolean, DataInput::readBoolean),
114+
FLOAT(KogitoEventBodySerializationHelper::writeFloat, DataInput::readFloat),
115+
DOUBLE(KogitoEventBodySerializationHelper::writeDouble, DataInput::readDouble);
116+
117+
final ObjectWriter writer;
118+
final ObjectReader reader;
119+
120+
SerType(ObjectWriter writer, ObjectReader reader) {
121+
this.writer = writer;
122+
this.reader = reader;
123+
}
124+
125+
ObjectWriter writer() {
126+
return writer;
127+
}
128+
129+
ObjectReader reader() {
130+
return reader;
131+
}
132+
133+
static SerType fromType(Class<?> type) {
134+
if (JsonNode.class.isAssignableFrom(type)) {
135+
return JSON;
136+
} else if (String.class.isAssignableFrom(type)) {
137+
return STRING;
138+
} else if (Boolean.class.isAssignableFrom(type)) {
139+
return BOOLEAN;
140+
} else if (Integer.class.isAssignableFrom(type)) {
141+
return INT;
142+
} else if (Short.class.isAssignableFrom(type)) {
143+
return SHORT;
144+
} else if (Byte.class.isAssignableFrom(type)) {
145+
return BYTE;
146+
} else if (Long.class.isAssignableFrom(type)) {
147+
return LONG;
148+
} else if (Float.class.isAssignableFrom(type)) {
149+
return FLOAT;
150+
} else if (Double.class.isAssignableFrom(type)) {
151+
return DOUBLE;
152+
} else {
153+
return DEFAULT;
154+
}
155+
}
156+
157+
static SerType fromObject(Object obj) {
158+
return obj == null ? NULL : fromType(obj.getClass());
159+
}
160+
}
161+
162+
private static void writeType(DataOutput out, SerType type) throws IOException {
163+
out.writeByte(type.ordinal());
164+
}
165+
166+
private static SerType readType(DataInput in) throws IOException {
167+
return SerType.values()[in.readByte()];
168+
}
169+
170+
public static void writeObject(DataOutput out, Object obj) throws IOException {
171+
SerType type = SerType.fromObject(obj);
172+
writeType(out, type);
173+
type.writer().accept(out, obj);
174+
}
175+
176+
public static Object readObject(DataInput in) throws IOException {
177+
return readType(in).reader().apply(in);
178+
}
179+
180+
@FunctionalInterface
181+
private static interface ObjectWriter {
182+
void accept(DataOutput out, Object obj) throws IOException;
183+
}
184+
185+
private static interface ObjectReader {
186+
Object apply(DataInput out) throws IOException;
187+
}
188+
189+
private static void writeString(DataOutput out, Object obj) throws IOException {
190+
out.writeUTF((String) obj);
191+
}
192+
193+
private static void writeBoolean(DataOutput out, Object obj) throws IOException {
194+
out.writeBoolean((Boolean) obj);
195+
}
196+
197+
private static void writeInt(DataOutput out, Object obj) throws IOException {
198+
out.writeInt((Integer) obj);
199+
}
200+
201+
private static void writeLong(DataOutput out, Object obj) throws IOException {
202+
out.writeInt((Integer) obj);
203+
}
204+
205+
private static void writeShort(DataOutput out, Object obj) throws IOException {
206+
out.writeShort((Short) obj);
207+
}
208+
209+
private static void writeByte(DataOutput out, Object obj) throws IOException {
210+
out.writeByte((Byte) obj);
211+
}
212+
213+
private static void writeFloat(DataOutput out, Object obj) throws IOException {
214+
out.writeFloat((Float) obj);
215+
}
216+
217+
private static void writeDouble(DataOutput out, Object obj) throws IOException {
218+
out.writeDouble((Double) obj);
219+
}
220+
221+
private static void writeNull(DataOutput out, Object obj) {
222+
// do nothing
223+
}
224+
225+
private static Object readNull(DataInput in) {
226+
return null;
227+
}
228+
229+
public static void writeInteger(DataOutput out, Integer integer) throws IOException {
230+
if (integer == null) {
231+
writeType(out, SerType.NULL);
232+
} else {
233+
writeInt(out, integer.intValue());
234+
}
235+
}
236+
237+
public static Integer readInteger(DataInput in) throws IOException {
238+
SerType type = readType(in);
239+
return type == SerType.NULL ? null : readInt(in, type);
240+
241+
}
242+
243+
private static void writeInt(DataOutput out, int size) throws IOException {
244+
if (size < Byte.MAX_VALUE) {
245+
writeType(out, SerType.BYTE);
246+
out.writeByte((byte) size);
247+
} else if (size < Short.MAX_VALUE) {
248+
writeType(out, SerType.SHORT);
249+
out.writeShort((short) size);
250+
} else {
251+
writeType(out, SerType.INT);
252+
out.writeInt(size);
253+
}
254+
}
255+
256+
private static int readInt(DataInput in) throws IOException {
257+
SerType type = readType(in);
258+
return readInt(in, type);
259+
}
260+
261+
private static int readInt(DataInput in, SerType type) throws IOException {
262+
switch (type) {
263+
case INT:
264+
return in.readInt();
265+
case SHORT:
266+
return in.readShort();
267+
case BYTE:
268+
return in.readByte();
269+
default:
270+
throw new IOException("Stream corrupted. Read unrecognized type " + type);
271+
}
272+
}
273+
274+
private static void writeJson(DataOutput out, Object obj) throws IOException {
275+
byte[] bytes = ObjectMapperFactory.get().writeValueAsBytes(obj);
276+
out.writeInt(bytes.length);
277+
out.write(bytes);
278+
}
279+
280+
private static Object readJson(DataInput in) throws IOException {
281+
return readJson(in, JsonNode.class);
282+
}
283+
284+
private static Object readDefault(DataInput in) throws IOException {
285+
return readJson(in, Object.class);
286+
}
287+
288+
private static Object readJson(DataInput in, Class<?> type) throws IOException {
289+
byte[] bytes = new byte[in.readInt()];
290+
in.readFully(bytes);
291+
return ObjectMapperFactory.get().readValue(bytes, type);
292+
}
293+
294+
public static Date toDate(OffsetDateTime time) {
295+
return time == null ? null : Date.from(time.toInstant());
296+
}
297+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.kie.kogito.event.process;
20+
21+
import java.io.DataInput;
22+
import java.io.DataOutput;
23+
import java.io.IOException;
24+
25+
public interface KogitoMarshallEventSupport {
26+
27+
void writeEvent(DataOutput out) throws IOException;
28+
29+
void readEvent(DataInput in) throws IOException;
30+
}

0 commit comments

Comments
 (0)