28
28
import java .util .ArrayList ;
29
29
import java .util .Collection ;
30
30
import java .util .List ;
31
+ import java .util .function .Supplier ;
31
32
import java .util .zip .GZIPInputStream ;
32
33
33
34
import org .kie .kogito .event .process .CloudEventVisitor ;
34
- import org .kie .kogito .event .process .KogitoEventBodySerializationHelper ;
35
35
import org .kie .kogito .event .process .KogitoMarshallEventSupport ;
36
36
import org .kie .kogito .event .process .MultipleProcessInstanceDataEvent ;
37
37
import org .kie .kogito .event .process .ProcessInstanceDataEvent ;
45
45
import org .kie .kogito .event .process .ProcessInstanceStateEventBody ;
46
46
import org .kie .kogito .event .process .ProcessInstanceVariableDataEvent ;
47
47
import org .kie .kogito .event .process .ProcessInstanceVariableEventBody ;
48
+ import org .slf4j .Logger ;
49
+ import org .slf4j .LoggerFactory ;
48
50
49
51
import com .fasterxml .jackson .core .JacksonException ;
50
52
import com .fasterxml .jackson .core .JsonParser ;
56
58
57
59
import io .cloudevents .SpecVersion ;
58
60
61
+ import static org .kie .kogito .event .process .KogitoEventBodySerializationHelper .readInt ;
62
+
59
63
public class MultipleProcessInstanceDataEventDeserializer extends JsonDeserializer <MultipleProcessInstanceDataEvent > implements ResolvableDeserializer {
60
64
65
+ private static final Logger logger = LoggerFactory .getLogger (MultipleProcessInstanceDataEventDeserializer .class );
66
+
61
67
private JsonDeserializer <Object > defaultDeserializer ;
62
68
63
69
public MultipleProcessInstanceDataEventDeserializer (JsonDeserializer <Object > deserializer ) {
@@ -101,24 +107,31 @@ private static boolean isCompressed(JsonNode node) {
101
107
public static Collection <ProcessInstanceDataEvent <? extends KogitoMarshallEventSupport >> readFromBytes (byte [] binaryValue , boolean compressed ) throws IOException {
102
108
InputStream wrappedIn = new ByteArrayInputStream (binaryValue );
103
109
if (compressed ) {
110
+ logger .trace ("Gzip compressed byte array" );
104
111
wrappedIn = new GZIPInputStream (wrappedIn );
105
112
}
106
113
try (DataInputStream in = new DataInputStream (wrappedIn )) {
107
- int size = in .readShort ();
114
+ int size = readInt (in );
115
+ logger .trace ("Reading collection of size {}" , size );
108
116
Collection <ProcessInstanceDataEvent <? extends KogitoMarshallEventSupport >> result = new ArrayList <>(size );
109
117
List <ProcessInstanceDataEventExtensionRecord > infos = new ArrayList <>();
110
118
while (size -- > 0 ) {
111
119
byte readInfo = in .readByte ();
120
+ logger .trace ("Info ordinal is {}" , readInfo );
112
121
ProcessInstanceDataEventExtensionRecord info ;
113
122
if (readInfo == -1 ) {
114
123
info = new ProcessInstanceDataEventExtensionRecord ();
115
124
info .readEvent (in );
125
+ logger .trace ("Info readed is {}" , info );
116
126
infos .add (info );
117
127
} else {
118
128
info = infos .get (readInfo );
129
+ logger .trace ("Info cached is {}" , info );
119
130
}
120
131
String type = in .readUTF ();
132
+ logger .trace ("Type is {}" , info );
121
133
result .add (getCloudEvent (in , type , info ));
134
+ logger .trace ("{} events remaining" , size );
122
135
}
123
136
return result ;
124
137
}
@@ -127,31 +140,44 @@ public static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventS
127
140
private static ProcessInstanceDataEvent <? extends KogitoMarshallEventSupport > getCloudEvent (DataInputStream in , String type , ProcessInstanceDataEventExtensionRecord info ) throws IOException {
128
141
switch (type ) {
129
142
case ProcessInstanceVariableDataEvent .VAR_TYPE :
130
- ProcessInstanceVariableDataEvent item = buildDataEvent (in , new ProcessInstanceVariableDataEvent (), new ProcessInstanceVariableEventBody () , info );
143
+ ProcessInstanceVariableDataEvent item = buildDataEvent (in , new ProcessInstanceVariableDataEvent (), ProcessInstanceVariableEventBody :: new , info );
131
144
item .setKogitoVariableName (item .getData ().getVariableName ());
132
145
return item ;
133
146
case ProcessInstanceStateDataEvent .STATE_TYPE :
134
- return buildDataEvent (in , new ProcessInstanceStateDataEvent (), new ProcessInstanceStateEventBody () , info );
147
+ return buildDataEvent (in , new ProcessInstanceStateDataEvent (), ProcessInstanceStateEventBody :: new , info );
135
148
case ProcessInstanceNodeDataEvent .NODE_TYPE :
136
- return buildDataEvent (in , new ProcessInstanceNodeDataEvent (), new ProcessInstanceNodeEventBody () , info );
149
+ return buildDataEvent (in , new ProcessInstanceNodeDataEvent (), ProcessInstanceNodeEventBody :: new , info );
137
150
case ProcessInstanceErrorDataEvent .ERROR_TYPE :
138
- return buildDataEvent (in , new ProcessInstanceErrorDataEvent (), new ProcessInstanceErrorEventBody () , info );
151
+ return buildDataEvent (in , new ProcessInstanceErrorDataEvent (), ProcessInstanceErrorEventBody :: new , info );
139
152
case ProcessInstanceSLADataEvent .SLA_TYPE :
140
- return buildDataEvent (in , new ProcessInstanceSLADataEvent (), new ProcessInstanceSLAEventBody () , info );
153
+ return buildDataEvent (in , new ProcessInstanceSLADataEvent (), ProcessInstanceSLAEventBody :: new , info );
141
154
default :
142
155
throw new UnsupportedOperationException ("Unrecognized event type " + type );
143
156
}
144
157
}
145
158
146
- private static <T extends ProcessInstanceDataEvent <V >, V extends KogitoMarshallEventSupport & CloudEventVisitor > T buildDataEvent (DataInput in , T cloudEvent , V body ,
159
+ private static <T extends ProcessInstanceDataEvent <V >, V extends KogitoMarshallEventSupport & CloudEventVisitor > T buildDataEvent (DataInput in , T cloudEvent , Supplier < V > bodySupplier ,
147
160
ProcessInstanceDataEventExtensionRecord info ) throws IOException {
148
- int delta = KogitoEventBodySerializationHelper .readInteger (in );
161
+ int delta = readInt (in );
162
+ logger .trace ("Time delta is {}" , delta );
149
163
cloudEvent .setTime (info .getTime ().plus (delta , ChronoUnit .MILLIS ));
150
164
KogitoDataEventSerializationHelper .readCloudEventAttrs (in , cloudEvent );
165
+ logger .trace ("Cloud event before population {}" , cloudEvent );
151
166
KogitoDataEventSerializationHelper .populateCloudEvent (cloudEvent , info );
152
- body .readEvent (in );
153
- body .visit (cloudEvent );
154
- cloudEvent .setData (body );
167
+ logger .trace ("Cloud event after population {}" , cloudEvent );
168
+
169
+ boolean isNotNull = in .readBoolean ();
170
+ if (isNotNull ) {
171
+ logger .trace ("Data is not null" );
172
+ V body = bodySupplier .get ();
173
+ body .readEvent (in );
174
+ logger .trace ("Event body before population {}" , body );
175
+ body .visit (cloudEvent );
176
+ logger .trace ("Event body after population {}" , body );
177
+ cloudEvent .setData (body );
178
+ } else {
179
+ logger .trace ("Data is null" );
180
+ }
155
181
return cloudEvent ;
156
182
}
157
183
0 commit comments