Skip to content

bugfix: Support serialization for dm.jdbc.driver.DmdbTimestamp (#6701) #6728

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6707](https://github.com/apache/incubator-seata/pull/6707)] fix readonly branch commit errors in Oracle XA transactions
- [[#6711](https://github.com/apache/incubator-seata/pull/6711)] fix dameng rollback info un compress fail
- [[#6714](https://github.com/apache/incubator-seata/pull/6714)] fix dameng delete undo fail
- [[#6701](https://github.com/apache/incubator-seata/pull/6728)] fix support serialization for dm.jdbc.driver.DmdbTimestamp


### optimize:
Expand Down Expand Up @@ -75,6 +76,7 @@ Thanks to these contributors for their code commits. Please report an unintended
- [liuqiufeng](https://github.com/liuqiufeng)
- [caohdgege](https://github.com/caohdgege)
- [imashimaro](https://github.com/hmj776521114)
- [lyl2008dsg](https://github.com/lyl2008dsg)


Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
3 changes: 3 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- [[#6707](https://github.com/apache/incubator-seata/pull/6707)] 修复Oracle XA事务中只读分支提交出错的问题
- [[#6711](https://github.com/apache/incubator-seata/pull/6711)] 修复达梦数据库的getRollbackInfo没有解压缩的问题
- [[#6714](https://github.com/apache/incubator-seata/pull/6714)] 修复达梦数据库的delete sql回滚失败的问题
- [[#6701](https://github.com/apache/incubator-seata/pull/6728)] 修复达梦数据库的对dm.jdbc.driver.DmdbTimestamp的支持

### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
Expand Down Expand Up @@ -79,6 +80,8 @@
- [liuqiufeng](https://github.com/liuqiufeng)
- [caohdgege](https://github.com/caohdgege)
- [imashimaro](https://github.com/hmj776521114)
- [lyl2008dsg](https://github.com/lyl2008dsg)



同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
Expand Down
5 changes: 5 additions & 0 deletions rm-datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,10 @@
<artifactId>commons-logging</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import java.io.IOException;
import java.io.Reader;
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -71,6 +73,10 @@ public class JacksonUndoLogParser implements UndoLogParser, Initialize {

private static final Logger LOGGER = LoggerFactory.getLogger(JacksonUndoLogParser.class);

private static final String DM_JDBC_DRIVER_DMDB_TIMESTAMP = "dm.jdbc.driver.DmdbTimestamp";

private static final String VALUE_OF = "valueOf";

/**
* the zoneId for LocalDateTime
*/
Expand Down Expand Up @@ -120,6 +126,16 @@ public class JacksonUndoLogParser implements UndoLogParser, Initialize {
*/
private final JsonDeserializer localDateTimeDeserializer = new LocalDateTimeDeserializer();

/**
* customize serializer for dm.jdbc.driver.DmdbTimestamp
*/
private final JsonSerializer dmdbTimestampSerializer = new DmdbTimestampSerializer();

/**
* customize deserializer for dm.jdbc.driver.DmdbTimestamp
*/
private final JsonDeserializer dmdbTimestampDeserializer = new DmdbTimestampDeserializer();

@Override
public void init() {
try {
Expand Down Expand Up @@ -152,12 +168,25 @@ public void init() {
module.addDeserializer(SerialClob.class, clobDeserializer);
module.addSerializer(LocalDateTime.class, localDateTimeSerializer);
module.addDeserializer(LocalDateTime.class, localDateTimeDeserializer);
registerDmdbTimestampModuleIfPresent();
mapper.registerModule(module);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
mapper.enable(MapperFeature.PROPAGATE_TRANSIENT_MARKER);
}

private void registerDmdbTimestampModuleIfPresent() {
try {
Class<?> dmdbTimestampClass = Class.forName(DM_JDBC_DRIVER_DMDB_TIMESTAMP);
module.addSerializer(dmdbTimestampClass, dmdbTimestampSerializer);
module.addDeserializer(dmdbTimestampClass, dmdbTimestampDeserializer);
} catch (ClassNotFoundException e) {
// If the DmdbTimestamp class is not found, the serializers and deserializers will not be registered.
// This is expected behavior since not all environments will have the dm.jdbc.driver.DmdbTimestamp class.
// Therefore, no error log is recorded to avoid confusion for users without the dm driver.
}
}

@Override
public String getName() {
return NAME;
Expand Down Expand Up @@ -395,6 +424,98 @@ public LocalDateTime deserialize(JsonParser p, DeserializationContext ctxt) thro
}
}

private static class DmdbTimestampSerializer extends JsonSerializer<Object> {

private static final String TO_INSTANT = "toInstant";
private static final String GET_NANOS = "getNanos";

@Override
public void serializeWithType(Object dmdbTimestamp, JsonGenerator gen, SerializerProvider serializers, TypeSerializer typeSer) throws IOException {
JsonToken valueShape = JsonToken.VALUE_NUMBER_INT;
int nanos = getNanos(dmdbTimestamp);
if (nanos % 1000000 > 0) {
valueShape = JsonToken.START_ARRAY;
}

WritableTypeId typeIdDef = typeSer.writeTypePrefix(gen, typeSer.typeId(dmdbTimestamp, valueShape));
serialize(dmdbTimestamp, gen, serializers);
typeSer.writeTypeSuffix(gen, typeIdDef);
}

@Override
public void serialize(Object dmdbTimestamp, JsonGenerator gen, SerializerProvider serializers) {
try {
Instant instant = getInstant(dmdbTimestamp);
gen.writeNumber(instant.toEpochMilli());
// if has microseconds, serialized as an array, write the nano to the array
int nanos = instant.getNano();
if (nanos % 1000000 > 0) {
gen.writeNumber(nanos);
}
} catch (Exception e) {
LOGGER.error("serialize dm.jdbc.driver.DmdbTimestamp error : {}", e.getMessage(), e);
}
}

private int getNanos(Object dmdbTimestamp) throws IOException {
try {
Method getNanosMethod = dmdbTimestamp.getClass().getMethod(GET_NANOS);
return (int) getNanosMethod.invoke(dmdbTimestamp);
} catch (Exception e) {
throw new IOException("Error getting nanos value from DmdbTimestamp", e);
}
}

private Instant getInstant(Object dmdbTimestamp) throws IOException {
try {
Method toInstantMethod = dmdbTimestamp.getClass().getMethod(TO_INSTANT);
return (Instant) toInstantMethod.invoke(dmdbTimestamp);
} catch (Exception e) {
throw new IOException("Error getting instant from DmdbTimestamp", e);
}
}
}

public class DmdbTimestampDeserializer extends JsonDeserializer<Object> {

@Override
public Object deserialize(JsonParser p, DeserializationContext ctxt) {
try {
Instant instant = parseInstant(p);
return createDmdbTimestamp(instant);
} catch (Exception e) {
LOGGER.error("deserialize dm.jdbc.driver.DmdbTimestamp error : {}", e.getMessage(), e);
}
return null;
}

private Instant parseInstant(JsonParser p) throws IOException {
try {
if (p.isExpectedStartArrayToken()) {
ArrayNode arrayNode = p.getCodec().readTree(p);
long timestamp = arrayNode.get(0).asLong();
Instant instant = Instant.ofEpochMilli(timestamp);
if (arrayNode.size() > 1) {
int nano = arrayNode.get(1).asInt();
instant = instant.plusNanos(nano % 1000000);
}
return instant;
} else {
long timestamp = p.getLongValue();
return Instant.ofEpochMilli(timestamp);
}
} catch (IOException e) {
throw new IOException("Error parsing Instant from JSON", e);
}
}

private Object createDmdbTimestamp(Instant instant) throws Exception {
Class<?> dmdbTimestampClass = Class.forName(DM_JDBC_DRIVER_DMDB_TIMESTAMP);
Method valueOfMethod = dmdbTimestampClass.getMethod(VALUE_OF, ZonedDateTime.class);
return valueOfMethod.invoke(null, instant.atZone(zoneId));
}
}

/**
* set zone id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
package org.apache.seata.rm.datasource.undo.parser;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.sql.JDBCType;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Date;
import javax.sql.rowset.serial.SerialBlob;
import javax.sql.rowset.serial.SerialClob;

Expand All @@ -32,6 +38,7 @@
import org.apache.seata.rm.datasource.undo.BaseUndoLogParserTest;
import org.apache.seata.rm.datasource.undo.UndoLogParser;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;


Expand Down Expand Up @@ -99,6 +106,46 @@ public void encode() throws NoSuchFieldException, IllegalAccessException, IOExce
bytes = mapper.writeValueAsBytes(field);
sameField = mapper.readValue(bytes, Field.class);
Assertions.assertTrue(DataCompareUtils.isFieldEquals(field, sameField).getResult());

}

@Test
public void testSerializeAndDeserializeDmdbTimestamp() throws NoSuchFieldException, IllegalAccessException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IOException {
java.lang.reflect.Field reflectField = parser.getClass().getDeclaredField("mapper");
reflectField.setAccessible(true);
ObjectMapper mapper = (ObjectMapper)reflectField.get(parser);

Class<?> dmdbTimestampClass = Class.forName("dm.jdbc.driver.DmdbTimestamp");
Method valueOfMethod = dmdbTimestampClass.getMethod("valueOf", ZonedDateTime.class);
Method valueOfDateMethod = dmdbTimestampClass.getMethod("valueOf", Date.class);

Object dmdbTimestamp = valueOfMethod.invoke(null, Instant.ofEpochMilli(1721985847000L).atZone(ZoneId.systemDefault()));
Field field = new Field("dmdb_timestamp", JDBCType.TIMESTAMP.getVendorTypeNumber(), dmdbTimestamp);
byte[] bytes = mapper.writeValueAsBytes(field);
Field sameField = mapper.readValue(bytes, Field.class);
Assertions.assertTrue(DataCompareUtils.isFieldEquals(field, sameField).getResult());

Object originalTimestamp = valueOfDateMethod.invoke(null, new Date(1721985847000L));
field = new Field("dmdb_timestamp_type2", JDBCType.TIMESTAMP.getVendorTypeNumber(), originalTimestamp);
bytes = mapper.writeValueAsBytes(field);
sameField = mapper.readValue(bytes, Field.class);
Assertions.assertFalse(DataCompareUtils.isFieldEquals(field, sameField).getResult());

Object dmdbTimestampnanos = valueOfMethod.invoke(null, Instant.ofEpochMilli(1721985847000L).plusNanos(12345L).atZone(ZoneId.systemDefault()));
field = new Field("dmdb_timestamp_nanos", JDBCType.TIMESTAMP.getVendorTypeNumber(), dmdbTimestampnanos);
bytes = mapper.writeValueAsBytes(field);
sameField = mapper.readValue(bytes, Field.class);
Assertions.assertTrue(DataCompareUtils.isFieldEquals(field, sameField).getResult());

}

private boolean checkClassExists(String className) {
try {
Class.forName(className);
return true;
} catch (ClassNotFoundException e) {
return false;
}
}

@Override
Expand Down
Loading