Skip to content

Commit 2a7dc88

Browse files
authored
optimize: load SeataSerializer by version (#6208)
1 parent 44caf1d commit 2a7dc88

File tree

37 files changed

+260
-89
lines changed

37 files changed

+260
-89
lines changed

changes/en-us/2.x.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Add changes here for all PR submitted to the 2.x branch.
1111

1212
### optimize:
1313
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses
14+
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : load SeataSerializer by version
1415

1516
### refactor:
1617
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: send async response
@@ -31,5 +32,6 @@ Thanks to these contributors for their code commits. Please report an unintended
3132
- [tuwenlin](https://github.com/tuwenlin)
3233
- [YeonCheolGit](https://github.com/YeonCheolGit)
3334
- [liuqiufeng](https://github.com/liuqiufeng)
35+
- [Bughue](https://github.com/Bughue)
3436

3537
Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.

changes/zh-cn/2.x.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
### optimize:
1212
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
13+
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化
14+
1315

1416
### refactor:
1517
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应
@@ -27,5 +29,6 @@
2729
- [tuwenlin](https://github.com/tuwenlin)
2830
- [YeonCheolGit](https://github.com/YeonCheolGit)
2931
- [liuqiufeng](https://github.com/liuqiufeng)
32+
- [Bughue](https://github.com/Bughue)
3033

3134
同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。

core/src/main/java/org/apache/seata/core/protocol/ProtocolConstants.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,20 @@ public interface ProtocolConstants {
3232
*/
3333
byte[] MAGIC_CODE_BYTES = {(byte) 0xda, (byte) 0xda};
3434

35+
/**
36+
* Old protocol version
37+
*/
38+
byte VERSION_0 = 0;
39+
40+
/**
41+
* Protocol version
42+
*/
43+
byte VERSION_1 = 1;
44+
3545
/**
3646
* Protocol version
3747
*/
38-
byte VERSION = 1;
48+
byte VERSION = VERSION_1;
3949

4050
/**
4151
* Max frame length

core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public Object decodeFrame(ByteBuf frame) {
152152
bs = compressor.decompress(bs);
153153
SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec());
154154
if (this.supportDeSerializerTypes.contains(protocolType)) {
155-
Serializer serializer = SerializerServiceLoader.load(protocolType);
155+
Serializer serializer = SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1);
156156
rpcMessage.setBody(serializer.deserialize(bs));
157157
} else {
158158
throw new IllegalArgumentException("SerializerType not match: " + protocolType.name());

core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
9393
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
9494
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
9595
// heartbeat has no body
96-
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
96+
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1);
9797
bodyBytes = serializer.serialize(rpcMessage.getBody());
9898
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
9999
bodyBytes = compressor.compress(bodyBytes);

core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,26 @@
3535
import static org.apache.seata.core.serializer.SerializerType.PROTOBUF;
3636
import static org.apache.seata.core.serializer.SerializerType.SEATA;
3737

38+
import java.util.HashMap;
39+
import java.util.Map;
40+
3841
/**
3942
* The Service Loader for the interface {@link Serializer}
40-
*
4143
*/
4244
public final class SerializerServiceLoader {
4345

4446
private static final Logger LOGGER = LoggerFactory.getLogger(SerializerServiceLoader.class);
4547
private static final Configuration CONFIG = ConfigurationFactory.getInstance();
4648

47-
private static final SerializerType[] DEFAULT_SERIALIZER_TYPE = new SerializerType[] {SEATA, PROTOBUF, KRYO, HESSIAN};
49+
private static final SerializerType[] DEFAULT_SERIALIZER_TYPE = new SerializerType[]{SEATA, PROTOBUF, KRYO, HESSIAN};
50+
51+
private final static Map<String, Serializer> SERIALIZER_MAP = new HashMap<>();
4852

4953
private static final String SPLIT_CHAR = ",";
5054

5155
private SerializerServiceLoader() {
5256
}
5357

54-
5558
private static final String PROTOBUF_SERIALIZER_CLASS_NAME = "org.apache.seata.serializer.protobuf.ProtobufSerializer";
5659

5760
/**
@@ -61,7 +64,7 @@ private SerializerServiceLoader() {
6164
* @return the service of {@link Serializer}
6265
* @throws EnhancedServiceNotFoundException the enhanced service not found exception
6366
*/
64-
public static Serializer load(SerializerType type) throws EnhancedServiceNotFoundException {
67+
public static Serializer load(SerializerType type, byte version) throws EnhancedServiceNotFoundException {
6568
if (type == SerializerType.PROTOBUF) {
6669
try {
6770
ReflectionUtil.getClassByName(PROTOBUF_SERIALIZER_CLASS_NAME);
@@ -70,9 +73,28 @@ public static Serializer load(SerializerType type) throws EnhancedServiceNotFoun
7073
"Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency ", e);
7174
}
7275
}
73-
return EnhancedServiceLoader.load(Serializer.class, type.name());
76+
77+
String key = serialzerKey(type, version);
78+
Serializer serializer = SERIALIZER_MAP.get(key);
79+
if (serializer == null) {
80+
if (type == SerializerType.SEATA) {
81+
serializer = EnhancedServiceLoader.load(Serializer.class, type.name(), new Object[]{version});
82+
} else {
83+
serializer = EnhancedServiceLoader.load(Serializer.class, type.name());
84+
}
85+
SERIALIZER_MAP.put(key, serializer);
86+
}
87+
return serializer;
7488
}
7589

90+
private static String serialzerKey(SerializerType type, byte version) {
91+
if (type == SerializerType.SEATA) {
92+
return type.name() + version;
93+
}
94+
return type.name();
95+
}
96+
97+
7698
public static List<SerializerType> getSupportedSerializers() {
7799
List<SerializerType> supportedSerializers = new ArrayList<>();
78100
String defaultSupportSerializers = Arrays.stream(DEFAULT_SERIALIZER_TYPE).map(SerializerType::name).collect(Collectors.joining(SPLIT_CHAR));
@@ -93,4 +115,4 @@ public static SerializerType getDefaultSerializerType() {
93115
return getSupportedSerializers().get(0);
94116
}
95117

96-
}
118+
}

serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MessageCodecFactory.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080

8181
/**
8282
* The type Message codec factory.
83-
*
8483
*/
8584
public class MessageCodecFactory {
8685

@@ -95,8 +94,8 @@ public class MessageCodecFactory {
9594
* @param abstractMessage the abstract message
9695
* @return the message codec
9796
*/
98-
public static MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage) {
99-
return getMessageCodec(abstractMessage.getTypeCode());
97+
public static MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage, byte version) {
98+
return getMessageCodec(abstractMessage.getTypeCode(), version);
10099
}
101100

102101
/**
@@ -105,14 +104,14 @@ public static MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage)
105104
* @param typeCode the type code
106105
* @return the msg instance by code
107106
*/
108-
public static MessageSeataCodec getMessageCodec(short typeCode) {
107+
public static MessageSeataCodec getMessageCodec(short typeCode, byte version) {
109108
MessageSeataCodec msgCodec = null;
110109
switch (typeCode) {
111110
case MessageType.TYPE_SEATA_MERGE:
112-
msgCodec = new MergedWarpMessageCodec();
111+
msgCodec = new MergedWarpMessageCodec(version);
113112
break;
114113
case MessageType.TYPE_SEATA_MERGE_RESULT:
115-
msgCodec = new MergeResultMessageCodec();
114+
msgCodec = new MergeResultMessageCodec(version);
116115
break;
117116
case MessageType.TYPE_REG_CLT:
118117
msgCodec = new RegisterTMRequestCodec();
@@ -136,7 +135,7 @@ public static MessageSeataCodec getMessageCodec(short typeCode) {
136135
msgCodec = new GlobalReportRequestCodec();
137136
break;
138137
case MessageType.TYPE_BATCH_RESULT_MSG:
139-
msgCodec = new BatchResultMessageCodec();
138+
msgCodec = new BatchResultMessageCodec(version);
140139
break;
141140
default:
142141
break;
@@ -147,15 +146,15 @@ public static MessageSeataCodec getMessageCodec(short typeCode) {
147146
}
148147

149148
try {
150-
msgCodec = getMergeRequestMessageSeataCodec(typeCode);
149+
msgCodec = getMergeRequestMessageSeataCodec(typeCode, version);
151150
} catch (Exception exx) {
152151
}
153152

154153
if (msgCodec != null) {
155154
return msgCodec;
156155
}
157156

158-
msgCodec = getMergeResponseMessageSeataCodec(typeCode);
157+
msgCodec = getMergeResponseMessageSeataCodec(typeCode, version);
159158

160159
return msgCodec;
161160
}
@@ -166,7 +165,7 @@ public static MessageSeataCodec getMessageCodec(short typeCode) {
166165
* @param typeCode the type code
167166
* @return the merge request instance by code
168167
*/
169-
protected static MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode) {
168+
protected static MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode, byte version) {
170169
switch (typeCode) {
171170
case MessageType.TYPE_GLOBAL_BEGIN:
172171
return new GlobalBeginRequestCodec();
@@ -195,7 +194,7 @@ protected static MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode
195194
* @param typeCode the type code
196195
* @return the merge response instance by code
197196
*/
198-
protected static MessageSeataCodec getMergeResponseMessageSeataCodec(int typeCode) {
197+
protected static MessageSeataCodec getMergeResponseMessageSeataCodec(int typeCode, byte version) {
199198
switch (typeCode) {
200199
case MessageType.TYPE_GLOBAL_BEGIN_RESULT:
201200
return new GlobalBeginResponseCodec();

0 commit comments

Comments
 (0)