diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Buf.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Buf.java index ecea29568..e3050c815 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Buf.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Buf.java @@ -24,6 +24,13 @@ public interface Buf { void writeInt(int i); void writeLong(long l); + void write(int pos, byte b); + void write(int pos, byte[] b); + void write(int srcPos, byte[] b, int pos, int length); + void writeShort(int pos, short i); + void writeInt(int pos, int i); + void writeLong(int pos, long l); + byte peek(); int peekInt(); long peekLong(); @@ -31,9 +38,17 @@ public interface Buf { byte read(); byte[] read(int length); void read(byte[] b, int pos, int length); + short readShort(); int readInt(); long readLong(); + byte readAt(int pos); + byte[] readAt(int pos, int length); + void readAt(int srcPos, byte[] b, int pos, int length); + short readShortAt(int pos); + int readIntAt(int pos); + long readLongAt(int pos); + void reverseWrite(byte b); byte reverseRead(); void reverseWriteInt(int i); @@ -44,6 +59,9 @@ public interface Buf { void reverseSkipInt(); void ensureRemainder(int length); void resize(int oldSize, int newSize); + void setForwardOffset(int pos); + int restReadableSize(); + int readOffset(); boolean isEnd(); diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/BufImpl.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/BufImpl.java index 5d3e14a13..6e7119e59 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/BufImpl.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/BufImpl.java @@ -22,6 +22,10 @@ public class BufImpl implements Buf { private int forwardPos; private int reversePos; + private void setForwardPos(int pos) { + this.forwardPos = pos; + } + public BufImpl(int bufSize) { this.buf = new byte[bufSize]; this.forwardPos = 0; @@ -34,23 +38,50 @@ public BufImpl(byte[] keyBuf) { this.reversePos = keyBuf.length - 1; } + public BufImpl(int bufSize, int dataPos) { + this.buf = new byte[bufSize]; + this.forwardPos = dataPos; + this.reversePos = bufSize - 1; + } + @Override public void write(byte b) { buf[forwardPos++] = b; } + @Override + public void write(int pos, byte b) { + buf[pos] = b; + } + @Override public void write(byte[] b) { System.arraycopy(b, 0, buf, forwardPos, b.length); forwardPos += b.length; } + @Override + public void write(int pos, byte[] b) { + System.arraycopy(b, 0, buf, pos, b.length); + } + @Override public void write(byte[] b, int pos, int length) { System.arraycopy(b, pos, buf, forwardPos, length); forwardPos += length; } + @Override + public void write(int srcPos, byte[] b, int pos, int length) { + System.arraycopy(b, pos, buf, srcPos, length); + } + + @Override + public void writeShort(int pos, short i) { + buf[pos] = (byte) (i >>> 8); + buf[pos + 1] = (byte) i; + } + @Override public void writeInt(int i) { buf[forwardPos++] = (byte) (i >>> 24); @@ -59,6 +90,14 @@ public void writeInt(int i) { buf[forwardPos++] = (byte) i; } + @Override + public void writeInt(int pos, int i) { + buf[pos] = (byte) (i >>> 24); + buf[pos + 1] = (byte) (i >>> 16); + buf[pos + 2] = (byte) (i >>> 8); + buf[pos + 3] = (byte) i; + } + @Override public void writeLong(long l) { buf[forwardPos++] = (byte) (l >>> 56); @@ -71,6 +110,18 @@ public void writeLong(long l) { buf[forwardPos++] = (byte) l; } + @Override + public void writeLong(int pos, long l) { + buf[pos] = (byte) (l >>> 56); + buf[pos + 1] = (byte) (l >>> 48); + buf[pos + 2] = (byte) (l >>> 40); + buf[pos + 3] = (byte) (l >>> 32); + buf[pos + 4] = (byte) (l >>> 24); + buf[pos + 5] = (byte) (l >>> 16); + buf[pos + 6] = (byte) (l >>> 8); + buf[pos + 7] = (byte) l; + } + @Override public byte peek() { return buf[forwardPos]; @@ -101,6 +152,11 @@ public byte read() { return buf[forwardPos++]; } + @Override + public byte readAt(int pos) { + return buf[pos]; + } + @Override public byte[] read(int length) { byte[] b = new byte[length]; @@ -109,12 +165,36 @@ public byte[] read(int length) { return b; } + @Override + public byte[] readAt(int pos, int length) { + byte[] b = new byte[length]; + System.arraycopy(buf, pos, b, 0, length); + return b; + } + @Override public void read(byte[] b, int pos, int length) { System.arraycopy(buf, forwardPos, b, pos, length); forwardPos += length; } + @Override + public void readAt(int srcPos, byte[] b, int pos, int length) { + System.arraycopy(buf, srcPos, b, pos, length); + } + + @Override + public short readShortAt(int pos) { + return (short)(((buf[pos] & 0xFF) << 8) + | buf[pos + 1] & 0xFF); + } + + @Override + public short readShort() { + return (short)(((buf[forwardPos++] & 0xFF) << 8) + | buf[forwardPos++] & 0xFF); + } + @Override public int readInt() { return (((buf[forwardPos++] & 0xFF) << 24) @@ -123,6 +203,14 @@ public int readInt() { | buf[forwardPos++] & 0xFF); } + @Override + public int readIntAt(int pos) { + return (((buf[pos++] & 0xFF) << 24) + | ((buf[pos++] & 0xFF) << 16) + | ((buf[pos++] & 0xFF) << 8) + | buf[pos++] & 0xFF); + } + @Override public long readLong() { long l = buf[forwardPos++] & 0xFF; @@ -133,6 +221,16 @@ public long readLong() { return l; } + @Override + public long readLongAt(int pos) { + long l = buf[pos++] & 0xFF; + for (int i = 0; i < 7; i++) { + l <<= 8; + l |= buf[pos++] & 0xFF; + } + return l; + } + @Override public void reverseWrite(byte b) { buf[reversePos--] = b; @@ -213,11 +311,26 @@ public void resize(int oldSize, int newSize) { } } + @Override + public void setForwardOffset(int pos) { + this.forwardPos = pos; + } + @Override public boolean isEnd() { return (reversePos - forwardPos + 1) == 0; } + @Override + public int restReadableSize() { + return this.buf.length - forwardPos - 1; + } + + @Override + public int readOffset() { + return forwardPos; + } + @Override public byte[] getBytes() { int emptySize = reversePos - forwardPos + 1; diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Config.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Config.java index a24f3e804..8be9c76f7 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Config.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Config.java @@ -22,5 +22,8 @@ public class Config { public final static int KEY_REVERSE_TAG_SIZE = 4; public final static byte CODEC_VERSION = 1; + public final static byte CODEC_VERSION_V2 = 2; + public final static int idUnit = 2; + public final static int offsetUnit = 4; } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/RecordDecoder.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/RecordDecoder.java index 99810cf14..2c68df62b 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/RecordDecoder.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/RecordDecoder.java @@ -20,7 +20,10 @@ import io.dingodb.sdk.common.serial.schema.DingoSchema; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TreeMap; public class RecordDecoder { private final int schemaVersion; @@ -42,15 +45,17 @@ private void checkPrefix(Buf buf) { } } - private void checkTag(Buf buf) { + private int checkTag(Buf buf) { int codecVer = buf.reverseRead(); - if (codecVer > Config.CODEC_VERSION) { + if (codecVer > Config.CODEC_VERSION_V2) { throw new RuntimeException( - "Invalid codec version, codec version: " + Config.CODEC_VERSION + ", key codec version " + codecVer + "Invalid codec version, " + ", key codec version " + codecVer ); } buf.reverseSkip(3); + + return codecVer; } private void checkSchemaVersion(Buf buf) { @@ -62,13 +67,15 @@ private void checkSchemaVersion(Buf buf) { } } - private void checkKeyValue(Buf keyBuf, Buf valueBuf) { - checkTag(keyBuf); + private int checkKeyValue(Buf keyBuf, Buf valueBuf) { + int codecVer = checkTag(keyBuf); checkPrefix(keyBuf); checkSchemaVersion(valueBuf); + + return codecVer; } - public Object[] decode(KeyValue keyValue) { + public Object[] decodeV1(KeyValue keyValue) { Buf keyBuf = new BufImpl(keyValue.getKey()); Buf valueBuf = new BufImpl(keyValue.getValue()); @@ -88,7 +95,7 @@ public Object[] decode(KeyValue keyValue) { return record; } - public Object[] decode(KeyValue keyValue, int[] columnIndexes) { + private Object[] decodeV1(KeyValue keyValue, int[] columnIndexes) { Buf keyBuf = new BufImpl(keyValue.getKey()); Buf valueBuf = new BufImpl(keyValue.getValue()); @@ -115,6 +122,133 @@ public Object[] decode(KeyValue keyValue, int[] columnIndexes) { return record; } + public Object[] decodeV2(KeyValue keyValue) { + Buf keyBuf = new BufImpl(keyValue.getKey()); + Buf valueBuf = new BufImpl(keyValue.getValue()); + + checkKeyValue(keyBuf, valueBuf); + + //decode value. + int cntNotNullCols = valueBuf.readShort(); + int cntNullCols = valueBuf.readShort(); + int totalColCnt = cntNotNullCols + cntNullCols; + int idsPos = 4 + 2 * 2; + int offsetPos = idsPos + 2 * totalColCnt; + int dataPos = offsetPos + 4 * totalColCnt; + + //get id-offset map. + Map idOffsetMap = new TreeMap(); + + for (int i = 0; i < totalColCnt; i++) { + short id = valueBuf.readShortAt(idsPos); + int offset = valueBuf.readIntAt(offsetPos); + idOffsetMap.put(id, offset); + idsPos += 2; + offsetPos += 4; + } + + valueBuf.setForwardOffset(dataPos); + + Object[] record = new Object[schemas.size()]; + for (DingoSchema schema : schemas) { + if (schema.isKey()) { + record[schema.getIndex()] = schema.decodeKeyV2(keyBuf); + } else { + if (valueBuf.isEnd()) { + continue; + } + + int index = schema.getIndex(); + if (idOffsetMap.get((short)index) == -1) { + //null column. + record[index] = null; + } else { + record[index] = schema.decodeValueV2(valueBuf); + } + } + } + return record; + } + + private Object[] decodeV2(KeyValue keyValue, int[] columnIndexes) { + Buf keyBuf = new BufImpl(keyValue.getKey()); + Buf valueBuf = new BufImpl(keyValue.getValue()); + + checkKeyValue(keyBuf, valueBuf); + + //decode value. + int cntNotNullCols = valueBuf.readShort(); + int cntNullCols = valueBuf.readShort(); + int totalColCnt = cntNotNullCols + cntNullCols; + int idsPos = 4 + 2 * 2; + int offsetPos = idsPos + 2 * totalColCnt; + int dataPos = offsetPos + 4 * totalColCnt; + + //get id-offset map. + Map idOffsetMap = new TreeMap(); + + for (int i = 0; i < totalColCnt; i++) { + short id = valueBuf.readShortAt(idsPos); + int offset = valueBuf.readIntAt(offsetPos); + idOffsetMap.put(id, offset); + idsPos += 2; + offsetPos += 4; + } + + valueBuf.setForwardOffset(dataPos); + + Object[] record = new Object[columnIndexes.length]; + int i = 0; + for (DingoSchema schema : schemas) { + if (Arrays.binarySearch(columnIndexes, schema.getIndex()) < 0) { + if (schema.isKey()) { + schema.skipKeyV2(keyBuf); + } else if (!valueBuf.isEnd()) { + if (idOffsetMap.get((short)schema.getIndex()) != -1) { //null + schema.skipValueV2(valueBuf); + } + } + } else { + if (schema.isKey()) { + record[i] = schema.decodeKeyV2(keyBuf); + } else if (!valueBuf.isEnd()) { + if (idOffsetMap.get((short)schema.getIndex()) == -1) { //null + record[i] = null; + } else { + record[i] = schema.decodeValueV2(valueBuf); + } + } + i++; + } + } + return record; + } + + public Object[] decode(KeyValue keyValue) { + Buf keyBuf = new BufImpl(keyValue.getKey()); + Buf valueBuf = new BufImpl(keyValue.getValue()); + + int codecVer = checkKeyValue(keyBuf, valueBuf); + if (codecVer <= Config.CODEC_VERSION) { + return decodeV1(keyValue); + } else { + return decodeV2(keyValue); + } + } + + + public Object[] decode(KeyValue keyValue, int[] columnIndexes) { + Buf keyBuf = new BufImpl(keyValue.getKey()); + Buf valueBuf = new BufImpl(keyValue.getValue()); + + int codecVer = checkKeyValue(keyBuf, valueBuf); + if (codecVer <= Config.CODEC_VERSION) { + return decodeV1(keyValue, columnIndexes); + } else { + return decodeV2(keyValue, columnIndexes); + } + } + public Object[] decodeKeyPrefix(byte[] keyPrefix) { Buf keyPrefixBuf = new BufImpl(keyPrefix); @@ -135,7 +269,7 @@ public Object[] decodeKeyPrefix(byte[] keyPrefix) { return record; } - public Object[] decodeValue(KeyValue keyValue, int[] columnIndexes) { + private Object[] decodeValueV1(KeyValue keyValue, int[] columnIndexes) { Buf valueBuf = new BufImpl(keyValue.getValue()); if (valueBuf.readInt() > schemaVersion) { throw new RuntimeException("Wrong Schema Version"); @@ -155,4 +289,62 @@ public Object[] decodeValue(KeyValue keyValue, int[] columnIndexes) { } return record; } + + private Object[] decodeValueV2(KeyValue keyValue, int[] columnIndexes) { + Buf valueBuf = new BufImpl(keyValue.getValue()); + if (valueBuf.readInt() > schemaVersion) { + throw new RuntimeException("Wrong Schema Version"); + } + + //decode value. + int cntNotNullCols = valueBuf.readShort(); + int cntNullCols = valueBuf.readShort(); + int totalColCnt = cntNotNullCols + cntNullCols; + int idsPos = 4 + 2 * 2; + int offsetPos = idsPos + 2 * totalColCnt; + int dataPos = offsetPos + 4 * totalColCnt; + + //get id-offset map. + Map idOffsetMap = new HashMap(); + + for (int i = 0; i < totalColCnt; i++) { + short id = valueBuf.readShortAt(idsPos); + int offset = valueBuf.readIntAt(offsetPos); + idOffsetMap.put(id, offset); + idsPos += 2; + offsetPos += 4; + } + + valueBuf.setForwardOffset(dataPos); + + Object[] record = new Object[schemas.size()]; + for (DingoSchema schema : schemas) { + if (valueBuf.isEnd()) { + break; + } + if (!schema.isKey()) { + if (Arrays.binarySearch(columnIndexes, schema.getIndex()) < 0) { + if (idOffsetMap.get((short)schema.getIndex()) != -1) { + schema.skipValueV2(valueBuf); + } + } else { + record[schema.getIndex()] = schema.decodeValueV2(valueBuf); + } + } + } + return record; + } + + public Object[] decodeValue(KeyValue keyValue, int[] columnIndexes) { + Buf keyBuf = new BufImpl(keyValue.getKey()); + Buf valueBuf = new BufImpl(keyValue.getValue()); + + int codecVer = checkKeyValue(keyBuf, valueBuf); + + if (codecVer == Config.CODEC_VERSION) { + return decodeValueV1(keyValue, columnIndexes); + } else { + return decodeValueV2(keyValue, columnIndexes); + } + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/RecordEncoder.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/RecordEncoder.java index b12358b98..4859edb56 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/RecordEncoder.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/RecordEncoder.java @@ -18,6 +18,7 @@ import io.dingodb.sdk.common.KeyValue; import io.dingodb.sdk.common.serial.schema.DingoSchema; +import lombok.Setter; import java.util.Arrays; import java.util.List; @@ -28,19 +29,132 @@ public class RecordEncoder { private final long id; private int keyBufSize; private int valueBufSize; + private int valueColumnCount; + + @Setter + private int codecVersion; + + private byte[] encodeValueV1(Object[] record) { + Buf valueBuf = new BufImpl(valueBufSize); + encodeSchemaVersion(valueBuf); + for (DingoSchema schema : schemas) { + if (!schema.isKey()) { + schema.encodeValue(valueBuf, record[schema.getIndex()]); + } + } + return valueBuf.getBytes(); + } + + private byte[] encodeValueV2(Object[] record) { + int cntNotNullCols = 0; + int cntNullCols = 0; + int totalLength = 0; + + int cntNotNullPos = 4; //The first 4 bytes is for schema version. + int cntNullPos = cntNotNullPos + 2; + int idPos = cntNullPos + 2; + int offsetPos = idPos + Config.idUnit * this.valueColumnCount; + int dataPos = offsetPos + Config.offsetUnit * this.valueColumnCount; + totalLength = 8; + + Buf valueBuf = new BufImpl(dataPos + valueBufSize, dataPos); + encodeSchemaVersion(0, valueBuf); + for (DingoSchema schema : schemas) { + + if (!schema.isKey()) { + int index = schema.getIndex(); + Object column = record[index]; + + if (column == null) { + cntNullCols++; + + //Write id. + valueBuf.writeShort(idPos, (short) index); + idPos += 2; + + //write offset. + valueBuf.writeInt(offsetPos, -1); + offsetPos += 4; + } else { + cntNotNullCols++; + + //write id. + valueBuf.writeShort(idPos, (short) index); + idPos += 2; + + //write offset. + valueBuf.writeInt(offsetPos, dataPos); + offsetPos += 4; + + //write data. + dataPos += schema.encodeValueV2(valueBuf, record[schema.getIndex()]); + } + } + } + + valueBuf.writeShort(cntNotNullPos, (short) cntNotNullCols); + valueBuf.writeShort(cntNullPos, (short) cntNullCols); + + return valueBuf.getBytes(); + } + + private int getValueColumnCount() { + int count = 0; + for ( DingoSchema schema : schemas ) { + if (!schema.isKey()) { + count++; + } + } + return count; + } public RecordEncoder(int schemaVersion, List schemas, long id) { this.schemaVersion = schemaVersion; this.schemas = schemas; this.id = id; - int[] size = Utils.getApproPerRecordSize(schemas); - this.keyBufSize = size[0]; - this.valueBufSize = size[1]; + this.codecVersion = Config.CODEC_VERSION_V2; + + if (this.codecVersion == Config.CODEC_VERSION_V2) { + int[] size = Utils.getApproPerRecordSizeV2(schemas); + this.keyBufSize = size[0]; + this.valueBufSize = size[1]; + } else { + int[] size = Utils.getApproPerRecordSize(schemas); + this.keyBufSize = size[0]; + this.valueBufSize = size[1]; + } + this.valueColumnCount = getValueColumnCount(); } public RecordEncoder(int schemaVersion, long id) { this.schemaVersion = schemaVersion; this.id = id; + this.codecVersion = Config.CODEC_VERSION_V2; + } + + public RecordEncoder(int codecVersion, int schemaVersion, List schemas, long id) { + this.schemaVersion = schemaVersion; + this.schemas = schemas; + this.id = id; + this.codecVersion = codecVersion; + + if (this.codecVersion == Config.CODEC_VERSION_V2) { + int[] size = Utils.getApproPerRecordSizeV2(schemas); + this.keyBufSize = size[0]; + this.valueBufSize = size[1]; + } else { + int[] size = Utils.getApproPerRecordSize(schemas); + this.keyBufSize = size[0]; + this.valueBufSize = size[1]; + } + + this.valueColumnCount = getValueColumnCount(); + } + + public RecordEncoder(int codecVersion, int schemaVersion, long id) { + this.schemaVersion = schemaVersion; + this.id = id; + this.codecVersion = codecVersion; } private void encodePrefix(Buf buf) { @@ -49,7 +163,7 @@ private void encodePrefix(Buf buf) { } private void encodeTag(Buf buf) { - buf.reverseWrite(Config.CODEC_VERSION); + buf.reverseWrite((byte)this.codecVersion); buf.reverseWrite((byte) 0); buf.reverseWrite((byte) 0); buf.reverseWrite((byte) 0); @@ -66,6 +180,10 @@ private void encodeSchemaVersion(Buf buf) { buf.writeInt(schemaVersion); } + private void encodeSchemaVersion(int pos, Buf buf) { + buf.writeInt(pos, schemaVersion); + } + public KeyValue encode(Object[] record) { KeyValue kv = new KeyValue(null, null); kv.setKey(encodeKey(record)); @@ -73,7 +191,7 @@ public KeyValue encode(Object[] record) { return kv; } - public byte[] encodeKey(Object[] record) { + private byte[] encodeKeyV1(Object[] record) { Buf keyBuf = new BufImpl(keyBufSize); encodeTag(keyBuf); @@ -87,15 +205,34 @@ public byte[] encodeKey(Object[] record) { return keyBuf.getBytes(); } - public byte[] encodeValue(Object[] record) { - Buf valueBuf = new BufImpl(valueBufSize); - encodeSchemaVersion(valueBuf); + private byte[] encodeKeyV2(Object[] record) { + Buf keyBuf = new BufImpl(keyBufSize); + + encodeTag(keyBuf); + encodePrefix(keyBuf); + for (DingoSchema schema : schemas) { - if (!schema.isKey()) { - schema.encodeValue(valueBuf, record[schema.getIndex()]); + if (schema.isKey()) { + schema.encodeKeyV2(keyBuf, record[schema.getIndex()]); } } - return valueBuf.getBytes(); + return keyBuf.getBytes(); + } + + public byte[] encodeKey(Object[] record) { + if ( this.codecVersion == Config.CODEC_VERSION) { + return encodeKeyV1(record); + } else { + return encodeKeyV2(record); + } + } + + public byte[] encodeValue(Object[] record) { + if ( this.codecVersion == Config.CODEC_VERSION) { + return encodeValueV1(record); + } else { + return encodeValueV2(record); + } } public byte[] encodeMinKeyPrefix() { diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Utils.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Utils.java index 23d967f89..7ab7e6c90 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Utils.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/Utils.java @@ -59,4 +59,18 @@ public static int[] getApproPerRecordSize(List schemas) { } return new int[] {keySize, valueSize}; } + + public static int[] getApproPerRecordSizeV2(List schemas) { + // prefix is namespace(1) and common id(8), codec version(1) and other(3) + int keySize = 1 + 8 + 4; + int valueSize = 4; + for (DingoSchema schema : schemas) { + if (schema.isKey()) { + keySize += (schema.getWithNullTagLength() == 1 ? 100 : schema.getWithNullTagLength()); + } else { + valueSize += (schema.getValueLengthV2() == 0 ? 100 : schema.getValueLengthV2()); + } + } + return new int[] {keySize, valueSize}; + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/ArraySchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/ArraySchema.java index 850b917e0..914cb370e 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/ArraySchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/ArraySchema.java @@ -1,3 +1,19 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.dingodb.sdk.common.serial.schema; import io.dingodb.sdk.common.serial.Buf; @@ -9,29 +25,47 @@ public class ArraySchema implements DingoSchema { private boolean isKey; private boolean allowNull = true; private DingoSchema elementSchema; + public ArraySchema(DingoSchema elementSchema) { this.elementSchema = elementSchema; } + @Override public Type getType() { return Type.ARRAY; } + @Override public void setIndex(int index) { this.index = index; } + @Override public int getIndex() { return index; } + @Override public void setIsKey(boolean isKey) { this.isKey = isKey; } + @Override public boolean isKey() { + return isKey; } + + @Override + public int getValueLengthV2() { + return 0; + } + + @Override + public int getWithNullTagLength() { + return 1; + } + @Override public int getLength() { if (allowNull) { @@ -39,6 +73,7 @@ public int getLength() { } return elementSchema.getLength(); } + private int getLength(T[] data) { int sum = 0; int elementSchemaSize = 0; @@ -50,7 +85,7 @@ private int getLength(T[] data) { case DOUBLE: case BYTES: elementSchemaSize = elementSchema.getLength(); - if(allowNull) { + if (allowNull) { sum = 5 + elementSchemaSize * data.length; } else { sum = 4 + elementSchemaSize * data.length; @@ -61,7 +96,7 @@ private int getLength(T[] data) { byte[] bytes = ((String)value).getBytes(StandardCharsets.UTF_8); sum += bytes.length; } - if(allowNull) { + if (allowNull) { sum += 5; } else { sum += 4; @@ -74,10 +109,37 @@ private int getLength(T[] data) { return sum; } + private int getLengthV2(T[] data) { + int sum = 0; + int elementSchemaSize = 0; + switch (elementSchema.getType()) { + case BOOLEAN: + case INTEGER: + case LONG: + case FLOAT: + case DOUBLE: + case BYTES: + elementSchemaSize = elementSchema.getLength(); + sum = 4 + elementSchemaSize * data.length; + break; + case STRING: + for (T value: data) { + byte[] bytes = ((String)value).getBytes(StandardCharsets.UTF_8); + sum += bytes.length; + } + sum += 4; + break; + default: + break; + } + return sum; + } + @Override public void setAllowNull(boolean allowNull) { this.allowNull = allowNull; } + @Override public boolean isAllowNull() { return allowNull; @@ -85,26 +147,55 @@ public boolean isAllowNull() { @Override public void encodeKey(Buf buf, T[] data) { + + throw new RuntimeException("Array cannot be key"); + } + + @Override + public void encodeKeyV2(Buf buf, T[] data) { + throw new RuntimeException("Array cannot be key"); } @Override public void encodeKeyForUpdate(Buf buf, T[] data) { + + throw new RuntimeException("Array cannot be key"); + } + + @Override + public void encodeKeyForUpdateV2(Buf buf, T[] data) { + throw new RuntimeException("Array cannot be key"); } @Override public T[] decodeKey(Buf buf) { + + throw new RuntimeException("Array cannot be key"); + } + + @Override + public T[] decodeKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); } @Override public T[] decodeKeyPrefix(Buf buf) { + throw new RuntimeException("Array cannot be key"); } @Override public void skipKey(Buf buf) { + + throw new RuntimeException("Array cannot be key"); + } + + @Override + public void skipKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); } @@ -135,6 +226,34 @@ public void encodeValue(Buf buf, T[] data) { } } } + + @Override + public int encodeValueV2(Buf buf, T[] data) { + int len = 0; + + if (allowNull) { + if (data == null) { + return 0; + } else { + len = getLengthV2(data); + buf.ensureRemainder(len); + buf.writeInt(data.length); + for (T element : data) { + elementSchema.encodeValue(buf, element); + } + } + } else { + len = getLengthV2(data); + buf.ensureRemainder(len); + buf.writeInt(data.length); + for (T element : data) { + elementSchema.encodeValue(buf, element); + } + } + + return len; + } + @Override public T[] decodeValue(Buf buf) { if (allowNull) { @@ -149,6 +268,17 @@ public T[] decodeValue(Buf buf) { } return array; } + + @Override + public T[] decodeValueV2(Buf buf) { + int length = buf.readInt(); + T[] array = (T[]) new Object[length]; + for (int i = 0; i < length; i++) { + array[i] = elementSchema.decodeValue(buf); + } + return array; + } + @Override public void skipValue(Buf buf) { if (allowNull) { @@ -161,4 +291,12 @@ public void skipValue(Buf buf) { elementSchema.skipValue(buf); } } + + @Override + public void skipValueV2(Buf buf) { + int length = buf.readInt(); + for (int i = 0; i < length; i++) { + elementSchema.skipValueV2(buf); + } + } } \ No newline at end of file diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BooleanListSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BooleanListSchema.java index 230c845ae..4dbbe9e80 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BooleanListSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BooleanListSchema.java @@ -1,6 +1,24 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.dingodb.sdk.common.serial.schema; import io.dingodb.sdk.common.serial.Buf; +import io.dingodb.sdk.common.serial.schema.DingoSchema; +import io.dingodb.sdk.common.serial.schema.Type; import java.util.ArrayList; import java.util.List; @@ -43,6 +61,11 @@ public boolean isKey() { return isKey; } + @Override + public int getWithNullTagLength() { + return 2; + } + @Override public int getLength() { if (allowNull) { @@ -51,8 +74,9 @@ public int getLength() { return getDataLength(); } - private int getWithNullTagLength() { - return 2; + @Override + public int getValueLengthV2() { + return 0; } private int getDataLength() { @@ -73,16 +97,30 @@ public void encodeKey(Buf buf, List data) { throw new RuntimeException("Array cannot be key"); } + public void encodeKeyV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + @Override public void encodeKeyForUpdate(Buf buf, List data) { throw new RuntimeException("Array cannot be key"); } + @Override + public void encodeKeyForUpdateV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + @Override public List decodeKey(Buf buf) { throw new RuntimeException("Array cannot be key"); } + @Override + public List decodeKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } + @Override public List decodeKeyPrefix(Buf buf) { throw new RuntimeException("Array cannot be key"); @@ -94,15 +132,17 @@ public void skipKey(Buf buf) { } @Override - public void encodeKeyPrefix(Buf buf, List data) { + public void skipKeyV2(Buf buf) { throw new RuntimeException("Array cannot be key"); } - private void internalEncodeNull(Buf buf) { - buf.write((byte) 0); + + @Override + public void encodeKeyPrefix(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); } - private void internalEncodeData(Buf buf, Boolean b) { - if (b) { + private void internalEncodeData(Buf buf, Boolean boolVal) { + if (boolVal) { buf.write((byte) 1); } else { buf.write((byte) 0); @@ -120,7 +160,7 @@ public void encodeValue(Buf buf, List data) { buf.write(NOTNULL); buf.writeInt(data.size()); for (Boolean value: data) { - if(value == null) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } internalEncodeData(buf, value); @@ -130,12 +170,47 @@ public void encodeValue(Buf buf, List data) { buf.ensureRemainder(4 + data.size()); buf.writeInt(data.size()); for (Boolean value: data) { - if(value == null) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + internalEncodeData(buf, value); + } + } + } + + @Override + public int encodeValueV2(Buf buf, List data) { + int len = 0; + + if (allowNull) { + if (data == null) { + return 0; + } else { + len = 4 + data.size(); + buf.ensureRemainder(len); + + buf.writeInt(data.size()); + for (Boolean value: data) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + internalEncodeData(buf, value); + } + } + } else { + len = 4 + data.size(); + buf.ensureRemainder(len); + + buf.writeInt(data.size()); + for (Boolean value: data) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } internalEncodeData(buf, value); } } + + return len; } @Override @@ -153,6 +228,16 @@ public List decodeValue(Buf buf) { return data; } + @Override + public List decodeValueV2(Buf buf) { + int size = buf.readInt(); + List data = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + data.add(internalDecodeData(buf)); + } + return data; + } + private Boolean internalDecodeData(Buf buf) { return buf.read() == (byte) 0 ? false : true; } @@ -167,5 +252,11 @@ public void skipValue(Buf buf) { int length = buf.readInt(); buf.skip(length); } + + @Override + public void skipValueV2(Buf buf) { + int length = buf.readInt(); + buf.skip(length); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BooleanSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BooleanSchema.java index 22ad58b8b..ab83167f3 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BooleanSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BooleanSchema.java @@ -64,7 +64,13 @@ public int getLength() { return getDataLength(); } - private int getWithNullTagLength() { + @Override + public int getValueLengthV2() { + return getDataLength(); + } + + @Override + public int getWithNullTagLength() { return 2; } @@ -99,6 +105,10 @@ public void encodeKey(Buf buf, Boolean data) { } } + public void encodeKeyV2(Buf buf, Boolean data) { + encodeKey(buf, data); + } + @Override public void encodeKeyForUpdate(Buf buf, Boolean data) { if (allowNull) { @@ -114,12 +124,31 @@ public void encodeKeyForUpdate(Buf buf, Boolean data) { } } + @Override + public void encodeKeyForUpdateV2(Buf buf, Boolean data) { + if (allowNull) { + if (data == null) { + buf.write(NULL); + internalEncodeNull(buf); + } else { + buf.write(NOTNULL); + internalEncodeData(buf, data); + } + } else { + if (data == null) { + throw new RuntimeException("Data is not allow as null."); + } + buf.write(NOTNULL); + internalEncodeData(buf, data); + } + } + private void internalEncodeNull(Buf buf) { buf.write((byte) 0); } - private void internalEncodeData(Buf buf, Boolean b) { - if (b) { + private void internalEncodeData(Buf buf, Boolean boolVal) { + if (boolVal) { buf.write((byte) 1); } else { buf.write((byte) 0); @@ -137,11 +166,16 @@ public Boolean decodeKey(Buf buf) { return internalDecodeData(buf); } + public Boolean decodeKeyV2(Buf buf) { + return decodeKey(buf); + } + @Override public Boolean decodeKeyPrefix(Buf buf) { return decodeKey(buf); } + private Boolean internalDecodeData(Buf buf) { return buf.read() == (byte) 0 ? false : true; } @@ -151,6 +185,10 @@ public void skipKey(Buf buf) { buf.skip(getLength()); } + public void skipKeyV2(Buf buf) { + skipKey(buf); + } + @Override public void encodeKeyPrefix(Buf buf, Boolean data) { encodeKey(buf, data); @@ -173,6 +211,23 @@ public void encodeValue(Buf buf, Boolean data) { } } + @Override + public int encodeValueV2(Buf buf, Boolean data) { + int len = getValueLengthV2(); + buf.ensureRemainder(len); + if (allowNull) { + if (data == null) { + return 0; + } else { + internalEncodeData(buf, data); + } + } else { + internalEncodeData(buf, data); + } + + return len; + } + @Override public Boolean decodeValue(Buf buf) { if (allowNull) { @@ -184,8 +239,18 @@ public Boolean decodeValue(Buf buf) { return internalDecodeData(buf); } + @Override + public Boolean decodeValueV2(Buf buf) { + return internalDecodeData(buf); + } + @Override public void skipValue(Buf buf) { buf.skip(getLength()); } + + @Override + public void skipValueV2(Buf buf) { + buf.skip(getValueLengthV2()); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BytesSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BytesSchema.java index 480c427ad..f1377cbb7 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BytesSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/BytesSchema.java @@ -61,6 +61,16 @@ public int getLength() { return 0; } + @Override + public int getValueLengthV2() { + return 0; + } + + @Override + public int getWithNullTagLength() { + return 1; + } + @Override public void setAllowNull(boolean allowNull) { this.allowNull = allowNull; @@ -92,6 +102,32 @@ public void encodeKey(Buf buf, byte[] data) { } } + @Override + public void encodeKeyV2(Buf buf, byte[] data) { + if (allowNull) { + if (data == null) { + buf.ensureRemainder(5); + buf.write(NULL); + buf.reverseWriteInt0(); + } else { + buf.ensureRemainder(1); + buf.write(NOTNULL); + int size = internalEncodeKey(buf, data); + buf.ensureRemainder(4); + buf.reverseWriteInt(size); + } + } else { + if (data == null) { + throw new RuntimeException("Data is not allow as null."); + } + buf.ensureRemainder(1); + buf.write(NOTNULL); + int size = internalEncodeKey(buf, data); + buf.ensureRemainder(4); + buf.reverseWriteInt(size); + } + } + private int internalEncodeKey(Buf buf, byte[] data) { int groupNum = data.length / 8; int size = (groupNum + 1) * 9; @@ -133,6 +169,25 @@ public void encodeKeyForUpdate(Buf buf, byte[] data) { } } + @Override + public void encodeKeyForUpdateV2(Buf buf, byte[] data) { + if (allowNull) { + if (data == null) { + buf.write(NULL); + buf.reverseWriteInt0(); + } else { + buf.write(NOTNULL); + buf.reverseWriteInt(internalEncodeKeyForUpdate(buf, data)); + } + } else { + if (data == null) { + throw new RuntimeException("Data is not allow as null."); + } + buf.write(NOTNULL); + buf.reverseWriteInt(internalEncodeKeyForUpdate(buf, data)); + } + } + private int internalEncodeKeyForUpdate(Buf buf, byte[] data) { int groupNum = data.length / 8; int size = (groupNum + 1) * 9; @@ -173,6 +228,16 @@ public byte[] decodeKey(Buf buf) { return internalReadBytes(buf); } + @Override + public byte[] decodeKeyV2(Buf buf) { + if (buf.read() == NULL) { + buf.reverseSkipInt(); + return null; + } + + return internalReadBytes(buf); + } + @Override public byte[] decodeKeyPrefix(Buf buf) { if (allowNull) { @@ -188,7 +253,8 @@ private byte[] internalReadKeyPrefixBytes(Buf buf) { do { length += 9; buf.skip(8); - } while(buf.read() == (byte) 255); + } + while (buf.read() == (byte) 255); int groupNum = length / 9; buf.skip(-1); int reminderZero = 255 - buf.read() & 0xFF; @@ -236,6 +302,11 @@ public void skipKey(Buf buf) { buf.skip(buf.reverseReadInt()); } + @Override + public void skipKeyV2(Buf buf) { + buf.skip(buf.reverseReadInt()); + } + @Override public void encodeKeyPrefix(Buf buf, byte[] data) { if (allowNull) { @@ -270,6 +341,31 @@ public void encodeValue(Buf buf, byte[] data) { } } + @Override + public int encodeValueV2(Buf buf, byte[] data) { + int len = 0; + + if (allowNull) { + if (data == null) { + return 0; + } else { + len = 4 + data.length; + buf.ensureRemainder(len); + + buf.writeInt(data.length); + buf.write(data); + } + } else { + len = 4 + data.length; + buf.ensureRemainder(len); + + buf.writeInt(data.length); + buf.write(data); + } + + return len; + } + @Override public byte[] decodeValue(Buf buf) { if (allowNull) { @@ -282,6 +378,11 @@ public byte[] decodeValue(Buf buf) { return buf.read(buf.readInt()); } + @Override + public byte[] decodeValueV2(Buf buf) { + return buf.read(buf.readInt()); + } + @Override public void skipValue(Buf buf) { if (allowNull) { @@ -292,4 +393,9 @@ public void skipValue(Buf buf) { buf.skip(buf.readInt()); } } + + @Override + public void skipValueV2(Buf buf) { + buf.skip(buf.readInt()); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DingoSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DingoSchema.java index 131800f9c..5fef4d767 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DingoSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DingoSchema.java @@ -23,6 +23,7 @@ public interface DingoSchema { byte NULL = 0; byte NOTNULL = 1; + //Common interfaces for both v1 and v2. Type getType(); void setIndex(int index); @@ -35,20 +36,45 @@ public interface DingoSchema { int getLength(); + int getWithNullTagLength(); + + int getValueLengthV2(); + void setAllowNull(boolean allowNull); boolean isAllowNull(); + //v1 interfaces. void encodeKey(Buf buf, T data); + void encodeKeyForUpdate(Buf buf, T data); + T decodeKey(Buf buf); + T decodeKeyPrefix(Buf buf); + void skipKey(Buf buf); void encodeKeyPrefix(Buf buf, T data); void encodeValue(Buf buf, T data); + T decodeValue(Buf buf); + void skipValue(Buf buf); + //v2 interfaces. + void encodeKeyV2(Buf buf, T data); + + void encodeKeyForUpdateV2(Buf buf, T data); + + T decodeKeyV2(Buf buf); + + void skipKeyV2(Buf buf); + + int encodeValueV2(Buf buf, T data); + + T decodeValueV2(Buf buf); + + void skipValueV2(Buf buf); } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DoubleListSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DoubleListSchema.java index 6d10d6f3c..8b4566916 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DoubleListSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DoubleListSchema.java @@ -67,7 +67,13 @@ public int getLength() { return getDataLength(); } - private int getWithNullTagLength() { + @Override + public int getValueLengthV2() { + return 0; + } + + @Override + public int getWithNullTagLength() { return 9; } @@ -86,13 +92,34 @@ public boolean isAllowNull() { } @Override - public void encodeKey(Buf buf, List data) {throw new RuntimeException("Array cannot be key");} + public void encodeKey(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } @Override - public void encodeKeyForUpdate(Buf buf, List data) {throw new RuntimeException("Array cannot be key");} + public void encodeKeyV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } @Override - public List decodeKey(Buf buf) {throw new RuntimeException("Array cannot be key");} + public void encodeKeyForUpdate(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public void encodeKeyForUpdateV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public List decodeKey(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public List decodeKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } @Override public List decodeKeyPrefix(Buf buf) { @@ -105,10 +132,15 @@ public void skipKey(Buf buf) { } @Override - public void encodeKeyPrefix(Buf buf, List data) { + public void skipKeyV2(Buf buf) { throw new RuntimeException("Array cannot be key"); } + @Override + public void encodeKeyPrefix(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + @Override public void encodeValue(Buf buf, List data) { if (allowNull) { @@ -120,7 +152,7 @@ public void encodeValue(Buf buf, List data) { buf.write(NOTNULL); buf.writeInt(data.size()); for (Double value: data) { - if(value == null) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } internalEncodeValue(buf, value); @@ -130,12 +162,47 @@ public void encodeValue(Buf buf, List data) { buf.ensureRemainder(8 + data.size() * 8); buf.writeInt(data.size()); for (Double value: data) { - if(value == null) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + internalEncodeValue(buf, value); + } + } + } + + @Override + public int encodeValueV2(Buf buf, List data) { + int len = 0; + + if (allowNull) { + if (data == null) { + return 0; + } else { + len = 4 + data.size() * 8; + buf.ensureRemainder(len); + + buf.writeInt(data.size()); + for (Double value: data) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + internalEncodeValue(buf, value); + } + } + } else { + len = 4 + data.size() * 8; + buf.ensureRemainder(len); + + buf.writeInt(data.size()); + for (Double value: data) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } internalEncodeValue(buf, value); } } + + return len; } private void internalEncodeValue(Buf buf, Double data) { @@ -150,7 +217,7 @@ private void internalEncodeValue(Buf buf, Double data) { buf.write((byte) ln); } - private Double internalDecodeData (Buf buf){ + private Double internalDecodeData( Buf buf ) { long l = buf.read() & 0xFF; for (int i = 0; i < 7; i++) { l <<= 8; @@ -175,6 +242,16 @@ public List decodeValue(Buf buf) { return data; } + @Override + public List decodeValueV2(Buf buf) { + int size = buf.readInt(); + List data = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + data.add(internalDecodeData(buf)); + } + return data; + } + @Override public void skipValue(Buf buf) { if (allowNull) { @@ -185,4 +262,10 @@ public void skipValue(Buf buf) { int length = buf.readInt(); buf.skip(length * 8); } + + @Override + public void skipValueV2(Buf buf) { + int length = buf.readInt(); + buf.skip(length * 8); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DoubleSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DoubleSchema.java index 3c5f8f1e8..eb6b6fb2e 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DoubleSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/DoubleSchema.java @@ -64,7 +64,13 @@ public int getLength() { return getDataLength(); } - private int getWithNullTagLength() { + @Override + public int getValueLengthV2() { + return getDataLength(); + } + + @Override + public int getWithNullTagLength() { return 9; } @@ -99,6 +105,10 @@ public void encodeKey(Buf buf, Double data) { } } + public void encodeKeyV2(Buf buf, Double data) { + encodeKey(buf, data); + } + @Override public void encodeKeyForUpdate(Buf buf, Double data) { if (allowNull) { @@ -114,6 +124,25 @@ public void encodeKeyForUpdate(Buf buf, Double data) { } } + @Override + public void encodeKeyForUpdateV2(Buf buf, Double data) { + if (allowNull) { + if (data == null) { + buf.write(NULL); + internalEncodeNull(buf); + } else { + buf.write(NOTNULL); + internalEncodeKey(buf, data); + } + } else { + if (data == null) { + throw new RuntimeException("Data is not allow as null."); + } + buf.write(NOTNULL); + internalEncodeKey(buf, data); + } + } + private void internalEncodeNull(Buf buf) { buf.write((byte) 0); buf.write((byte) 0); @@ -174,6 +203,10 @@ public Double decodeKey(Buf buf) { return Double.longBitsToDouble(l); } + public Double decodeKeyV2(Buf buf) { + return decodeKey(buf); + } + @Override public Double decodeKeyPrefix(Buf buf) { return decodeKey(buf); @@ -184,6 +217,11 @@ public void skipKey(Buf buf) { buf.skip(getLength()); } + @Override + public void skipKeyV2(Buf buf) { + skipKey(buf); + } + @Override public void encodeKeyPrefix(Buf buf, Double data) { encodeKey(buf, data); @@ -206,6 +244,24 @@ public void encodeValue(Buf buf, Double data) { } } + @Override + public int encodeValueV2(Buf buf, Double data) { + int len = getValueLengthV2(); + buf.ensureRemainder(len); + + if (allowNull) { + if (data == null) { + return 0; + } else { + internalEncodeValue(buf, data); + } + } else { + internalEncodeValue(buf, data); + } + + return len; + } + private void internalEncodeValue(Buf buf, Double data) { long ln = Double.doubleToLongBits(data); buf.write((byte) (ln >>> 56)); @@ -234,8 +290,23 @@ public Double decodeValue(Buf buf) { return Double.longBitsToDouble(l); } + @Override + public Double decodeValueV2(Buf buf) { + long l = buf.read() & 0xFF; + for (int i = 0; i < 7; i++) { + l <<= 8; + l |= buf.read() & 0xFF; + } + return Double.longBitsToDouble(l); + } + @Override public void skipValue(Buf buf) { buf.skip(getLength()); } + + @Override + public void skipValueV2(Buf buf) { + buf.skip(getValueLengthV2()); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/FloatListSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/FloatListSchema.java index a4c54bdbb..c90988433 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/FloatListSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/FloatListSchema.java @@ -67,7 +67,13 @@ public int getLength() { return getDataLength(); } - private int getWithNullTagLength() { + @Override + public int getValueLengthV2() { + return 0; + } + + @Override + public int getWithNullTagLength() { return 5; } @@ -86,13 +92,34 @@ public boolean isAllowNull() { } @Override - public void encodeKey(Buf buf, List data) {throw new RuntimeException("Array cannot be key");} + public void encodeKey(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } @Override - public void encodeKeyForUpdate(Buf buf, List data) {throw new RuntimeException("Array cannot be key");} + public void encodeKeyV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } @Override - public List decodeKey(Buf buf) {throw new RuntimeException("Array cannot be key");} + public void encodeKeyForUpdate(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public void encodeKeyForUpdateV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public List decodeKey(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public List decodeKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } @Override public List decodeKeyPrefix(Buf buf) { @@ -104,6 +131,11 @@ public void skipKey(Buf buf) { throw new RuntimeException("Array cannot be key"); } + @Override + public void skipKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } + @Override public void encodeKeyPrefix(Buf buf, List data) { throw new RuntimeException("Array cannot be key"); @@ -120,7 +152,7 @@ public void encodeValue(Buf buf, List data) { buf.write(NOTNULL); buf.writeInt(data.size()); for (Float value: data) { - if(value == null) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } internalEncodeValue(buf, value); @@ -130,7 +162,7 @@ public void encodeValue(Buf buf, List data) { buf.ensureRemainder(4 + data.size() * 4); buf.writeInt(data.size()); for (Float value: data) { - if(value == null) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } internalEncodeValue(buf, value); @@ -138,6 +170,41 @@ public void encodeValue(Buf buf, List data) { } } + @Override + public int encodeValueV2(Buf buf, List data) { + int len = 0; + + if (allowNull) { + if (data == null) { + return 0; + } else { + len = 4 + data.size() * 4; + buf.ensureRemainder(len); + + buf.writeInt(data.size()); + for (Float value: data) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + internalEncodeValue(buf, value); + } + } + } else { + len = 4 + data.size() * 4; + buf.ensureRemainder(len); + + buf.writeInt(data.size()); + for (Float value: data) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + internalEncodeValue(buf, value); + } + } + + return len; + } + private void internalEncodeValue(Buf buf, Float data) { int in = Float.floatToIntBits(data); buf.write((byte) (in >>> 24)); @@ -146,7 +213,7 @@ private void internalEncodeValue(Buf buf, Float data) { buf.write((byte) in); } - private Float internalDecodeData (Buf buf){ + private Float internalDecodeData(Buf buf) { int in = buf.read() & 0xFF; for (int i = 0; i < 3; i++) { in <<= 8; @@ -171,6 +238,16 @@ public List decodeValue(Buf buf) { return data; } + @Override + public List decodeValueV2(Buf buf) { + int size = buf.readInt(); + List data = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + data.add(internalDecodeData(buf)); + } + return data; + } + @Override public void skipValue(Buf buf) { if (allowNull) { @@ -181,4 +258,10 @@ public void skipValue(Buf buf) { int length = buf.readInt(); buf.skip(length * 4); } + + @Override + public void skipValueV2(Buf buf) { + int length = buf.readInt(); + buf.skip(length * 4); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/FloatSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/FloatSchema.java index 4653dae26..a0aa24cb9 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/FloatSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/FloatSchema.java @@ -64,7 +64,13 @@ public int getLength() { return getDataLength(); } - private int getWithNullTagLength() { + @Override + public int getValueLengthV2() { + return getDataLength(); + } + + @Override + public int getWithNullTagLength() { return 5; } @@ -99,6 +105,10 @@ public void encodeKey(Buf buf, Float data) { } } + public void encodeKeyV2(Buf buf, Float data) { + encodeKey(buf, data); + } + @Override public void encodeKeyForUpdate(Buf buf, Float data) { if (allowNull) { @@ -114,6 +124,25 @@ public void encodeKeyForUpdate(Buf buf, Float data) { } } + @Override + public void encodeKeyForUpdateV2(Buf buf, Float data) { + if (allowNull) { + if (data == null) { + buf.write(NULL); + internalEncodeNull(buf); + } else { + buf.write(NOTNULL); + internalEncodeKey(buf, data); + } + } else { + if (data == null) { + throw new RuntimeException("Data is not allow as null."); + } + buf.write(NOTNULL); + internalEncodeKey(buf, data); + } + } + private void internalEncodeNull(Buf buf) { buf.write((byte) 0); buf.write((byte) 0); @@ -162,6 +191,10 @@ public Float decodeKey(Buf buf) { return Float.intBitsToFloat(in); } + public Float decodeKeyV2(Buf buf) { + return decodeKey(buf); + } + @Override public Float decodeKeyPrefix(Buf buf) { return decodeKey(buf); @@ -172,11 +205,16 @@ public void skipKey(Buf buf) { buf.skip(getLength()); } + public void skipKeyV2(Buf buf) { + skipKey(buf); + } + @Override public void encodeKeyPrefix(Buf buf, Float data) { encodeKey(buf, data); } + @Override public void encodeValue(Buf buf, Float data) { if (allowNull) { @@ -194,6 +232,24 @@ public void encodeValue(Buf buf, Float data) { } } + @Override + public int encodeValueV2(Buf buf, Float data) { + int len = getValueLengthV2(); + buf.ensureRemainder(len); + + if (allowNull) { + if (data == null) { + return 0; + } else { + internalEncodeValue(buf, data); + } + } else { + internalEncodeValue(buf, data); + } + + return len; + } + private void internalEncodeValue(Buf buf, Float data) { int in = Float.floatToIntBits(data); buf.write((byte) (in >>> 24)); @@ -218,8 +274,23 @@ public Float decodeValue(Buf buf) { return Float.intBitsToFloat(in); } + @Override + public Float decodeValueV2(Buf buf) { + int in = buf.read() & 0xFF; + for (int i = 0; i < 3; i++) { + in <<= 8; + in |= buf.read() & 0xFF; + } + return Float.intBitsToFloat(in); + } + @Override public void skipValue(Buf buf) { buf.skip(getLength()); } + + @Override + public void skipValueV2(Buf buf) { + buf.skip(getValueLengthV2()); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/IntegerListSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/IntegerListSchema.java index fdd4405de..07dffcb01 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/IntegerListSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/IntegerListSchema.java @@ -67,7 +67,13 @@ public int getLength() { return getDataLength(); } - private int getWithNullTagLength() { + @Override + public int getValueLengthV2() { + return 0; + } + + @Override + public int getWithNullTagLength() { return 5; } @@ -86,10 +92,24 @@ public boolean isAllowNull() { } @Override - public void encodeKey(Buf buf, List data) {throw new RuntimeException("Array cannot be key");} + public void encodeKey(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public void encodeKeyV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public void encodeKeyForUpdate(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } @Override - public void encodeKeyForUpdate(Buf buf, List data) {throw new RuntimeException("Array cannot be key");} + public void encodeKeyForUpdateV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } private void internalEncodeNull(Buf buf) { buf.write((byte) 0); @@ -106,15 +126,30 @@ private void internalEncodeKey(Buf buf, Integer data) { } @Override - public List decodeKey(Buf buf) {throw new RuntimeException("Array cannot be key");} + public List decodeKey(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public List decodeKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } @Override public List decodeKeyPrefix(Buf buf) { + throw new RuntimeException("Array cannot be key"); } @Override public void skipKey(Buf buf) { + + throw new RuntimeException("Array cannot be key"); + } + + @Override + public void skipKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); } @@ -135,7 +170,7 @@ public void encodeValue(Buf buf, List data) { buf.writeInt(data.size()); // 1 for (Integer value: data) { - if(value == null) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } internalEncodeValue(buf, value); @@ -145,7 +180,7 @@ public void encodeValue(Buf buf, List data) { buf.ensureRemainder(4 + data.size() * 4); buf.writeInt(data.size()); for (Integer value: data) { - if(value == null) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } internalEncodeValue(buf, value); @@ -153,6 +188,42 @@ public void encodeValue(Buf buf, List data) { } } + @Override + public int encodeValueV2(Buf buf, List data) { + int len = 0; + + if (allowNull) { + if (data == null) { + return 0; + } else { + len = 4 + data.size() * 4; + buf.ensureRemainder(len); + + buf.writeInt(data.size()); + // 1 + for (Integer value: data) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + internalEncodeValue(buf, value); + } + } + } else { + len = 4 + data.size() * 4; + buf.ensureRemainder(len); + + buf.writeInt(data.size()); + for (Integer value: data) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + internalEncodeValue(buf, value); + } + } + + return len; + } + private void internalEncodeValue(Buf buf, Integer data) { buf.write((byte) (data >>> 24)); buf.write((byte) (data >>> 16)); @@ -160,7 +231,7 @@ private void internalEncodeValue(Buf buf, Integer data) { buf.write((byte) data.intValue()); } - private Integer internalDecodeData (Buf buf){ + private Integer internalDecodeData( Buf buf ) { return (((buf.read() & 0xFF) << 24) | ((buf.read() & 0xFF) << 16) | ((buf.read() & 0xFF) << 8) @@ -183,6 +254,16 @@ public List decodeValue(Buf buf) { return data; } + @Override + public List decodeValueV2(Buf buf) { + int size = buf.readInt(); + List data = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + data.add(internalDecodeData(buf)); + } + return data; + } + @Override public void skipValue(Buf buf) { if (allowNull) { @@ -193,4 +274,10 @@ public void skipValue(Buf buf) { int length = buf.readInt(); buf.skip(length * 4); } + + @Override + public void skipValueV2(Buf buf) { + int length = buf.readInt(); + buf.skip(length * 4); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/IntegerSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/IntegerSchema.java index 088a0b614..a72001ce0 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/IntegerSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/IntegerSchema.java @@ -64,7 +64,13 @@ public int getLength() { return getDataLength(); } - private int getWithNullTagLength() { + @Override + public int getValueLengthV2() { + return getDataLength(); + } + + @Override + public int getWithNullTagLength() { return 5; } @@ -99,6 +105,11 @@ public void encodeKey(Buf buf, Integer data) { } } + @Override + public void encodeKeyV2(Buf buf, Integer data) { + encodeKey(buf, data); + } + @Override public void encodeKeyForUpdate(Buf buf, Integer data) { if (allowNull) { @@ -114,6 +125,11 @@ public void encodeKeyForUpdate(Buf buf, Integer data) { } } + @Override + public void encodeKeyForUpdateV2(Buf buf, Integer data) { + encodeKeyForUpdate(buf, data); + } + private void internalEncodeNull(Buf buf) { buf.write((byte) 0); buf.write((byte) 0); @@ -142,6 +158,11 @@ public Integer decodeKey(Buf buf) { | (buf.read() & 0xFF)); } + @Override + public Integer decodeKeyV2(Buf buf) { + return decodeKey(buf); + } + @Override public Integer decodeKeyPrefix(Buf buf) { return decodeKey(buf); @@ -152,6 +173,11 @@ public void skipKey(Buf buf) { buf.skip(getLength()); } + @Override + public void skipKeyV2(Buf buf) { + skipKey(buf); + } + @Override public void encodeKeyPrefix(Buf buf, Integer data) { encodeKey(buf, data); @@ -174,6 +200,23 @@ public void encodeValue(Buf buf, Integer data) { } } + @Override + public int encodeValueV2(Buf buf, Integer data) { + int len = getValueLengthV2(); + buf.ensureRemainder(len); + if (allowNull) { + if (data == null) { + return 0; + } else { + internalEncodeValue(buf, data); + } + } else { + internalEncodeValue(buf, data); + } + + return len; + } + private void internalEncodeValue(Buf buf, Integer data) { buf.write((byte) (data >>> 24)); buf.write((byte) (data >>> 16)); @@ -195,8 +238,21 @@ public Integer decodeValue(Buf buf) { | (buf.read() & 0xFF)); } + @Override + public Integer decodeValueV2(Buf buf) { + return (((buf.read() & 0xFF) << 24) + | ((buf.read() & 0xFF) << 16) + | ((buf.read() & 0xFF) << 8) + | (buf.read() & 0xFF)); + } + @Override public void skipValue(Buf buf) { buf.skip(getLength()); } + + @Override + public void skipValueV2(Buf buf) { + buf.skip(getValueLengthV2()); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/LongListSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/LongListSchema.java index 0c03c9176..6f618bdde 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/LongListSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/LongListSchema.java @@ -67,7 +67,13 @@ public int getLength() { return getDataLength(); } - private int getWithNullTagLength() { + @Override + public int getValueLengthV2() { + return 0; + } + + @Override + public int getWithNullTagLength() { return 9; } @@ -86,10 +92,24 @@ public boolean isAllowNull() { } @Override - public void encodeKey(Buf buf, List data) {throw new RuntimeException("Array cannot be key");} + public void encodeKey(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } @Override - public void encodeKeyForUpdate(Buf buf, List data) {throw new RuntimeException("Array cannot be key");} + public void encodeKeyV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public void encodeKeyForUpdate(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public void encodeKeyForUpdateV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } private void internalEncodeNull(Buf buf) { buf.write((byte) 0); @@ -102,10 +122,19 @@ private void internalEncodeNull(Buf buf) { buf.write((byte) 0); } - private void internalEncodeKey(Buf buf, Long data) {throw new RuntimeException("Array cannot be key");} + private void internalEncodeKey(Buf buf, Long data) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public List decodeKey(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } @Override - public List decodeKey(Buf buf) {throw new RuntimeException("Array cannot be key");} + public List decodeKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } @Override public List decodeKeyPrefix(Buf buf) { @@ -117,6 +146,11 @@ public void skipKey(Buf buf) { throw new RuntimeException("Array cannot be key"); } + @Override + public void skipKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } + @Override public void encodeKeyPrefix(Buf buf, List data) { throw new RuntimeException("Array cannot be key"); @@ -133,7 +167,7 @@ public void encodeValue(Buf buf, List data) { buf.write(NOTNULL); buf.writeInt(data.size()); for (Long value: data) { - if(value == null) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } internalEncodeValue(buf, value); @@ -143,7 +177,7 @@ public void encodeValue(Buf buf, List data) { buf.ensureRemainder(8 + data.size() * 8); buf.writeInt(data.size()); for (Long value: data) { - if(value == null) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } internalEncodeValue(buf, value); @@ -151,6 +185,41 @@ public void encodeValue(Buf buf, List data) { } } + @Override + public int encodeValueV2(Buf buf, List data) { + int len = 0; + + if (allowNull) { + if (data == null) { + return 0; + } else { + len = 4 + data.size() * 8; + buf.ensureRemainder(len); + + buf.writeInt(data.size()); + for (Long value: data) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + internalEncodeValue(buf, value); + } + } + } else { + len = 4 + data.size() * 8; + buf.ensureRemainder(len); + + buf.writeInt(data.size()); + for (Long value: data) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + internalEncodeValue(buf, value); + } + } + + return len; + } + private void internalEncodeValue(Buf buf, Long data) { buf.write((byte) (data >>> 56)); buf.write((byte) (data >>> 48)); @@ -162,7 +231,7 @@ private void internalEncodeValue(Buf buf, Long data) { buf.write((byte) data.longValue()); } - private Long internalDecodeData (Buf buf){ + private Long internalDecodeData( Buf buf ) { long l = buf.read() & 0xFF; for (int i = 0; i < 7; i++) { l <<= 8; @@ -187,6 +256,16 @@ public List decodeValue(Buf buf) { return data; } + @Override + public List decodeValueV2(Buf buf) { + int size = buf.readInt(); + List data = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + data.add(internalDecodeData(buf)); + } + return data; + } + @Override public void skipValue(Buf buf) { if (allowNull) { @@ -197,4 +276,10 @@ public void skipValue(Buf buf) { int length = buf.readInt(); buf.skip(length * 8); } + + @Override + public void skipValueV2(Buf buf) { + int length = buf.readInt(); + buf.skip(length * 8); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/LongSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/LongSchema.java index 336984193..b81aed526 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/LongSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/LongSchema.java @@ -64,7 +64,13 @@ public int getLength() { return getDataLength(); } - private int getWithNullTagLength() { + @Override + public int getValueLengthV2() { + return getDataLength(); + } + + @Override + public int getWithNullTagLength() { return 9; } @@ -99,6 +105,10 @@ public void encodeKey(Buf buf, Long data) { } } + public void encodeKeyV2(Buf buf, Long data) { + encodeKey(buf, data); + } + @Override public void encodeKeyForUpdate(Buf buf, Long data) { if (allowNull) { @@ -114,6 +124,10 @@ public void encodeKeyForUpdate(Buf buf, Long data) { } } + public void encodeKeyForUpdateV2(Buf buf, Long data) { + encodeKeyForUpdate(buf, data); + } + private void internalEncodeNull(Buf buf) { buf.write((byte) 0); buf.write((byte) 0); @@ -153,6 +167,10 @@ public Long decodeKey(Buf buf) { return l; } + public Long decodeKeyV2(Buf buf) { + return decodeKey(buf); + } + @Override public Long decodeKeyPrefix(Buf buf) { return decodeKey(buf); @@ -163,6 +181,10 @@ public void skipKey(Buf buf) { buf.skip(getLength()); } + public void skipKeyV2(Buf buf) { + skipKey(buf); + } + @Override public void encodeKeyPrefix(Buf buf, Long data) { encodeKey(buf, data); @@ -185,6 +207,24 @@ public void encodeValue(Buf buf, Long data) { } } + @Override + public int encodeValueV2(Buf buf, Long data) { + int len = getValueLengthV2(); + buf.ensureRemainder(len); + + if (allowNull) { + if (data == null) { + return 0; + } else { + internalEncodeValue(buf, data); + } + } else { + internalEncodeValue(buf, data); + } + + return len; + } + private void internalEncodeValue(Buf buf, Long data) { buf.write((byte) (data >>> 56)); buf.write((byte) (data >>> 48)); @@ -212,8 +252,23 @@ public Long decodeValue(Buf buf) { return l; } + @Override + public Long decodeValueV2(Buf buf) { + long l = buf.read() & 0xFF; + for (int i = 0; i < 7; i++) { + l <<= 8; + l |= buf.read() & 0xFF; + } + return l; + } + @Override public void skipValue(Buf buf) { buf.skip(getLength()); } + + @Override + public void skipValueV2(Buf buf) { + buf.skip(getValueLengthV2()); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/StringListSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/StringListSchema.java index 2df7b0353..8918b4b0b 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/StringListSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/StringListSchema.java @@ -65,6 +65,16 @@ public int getLength() { return 0; } + @Override + public int getWithNullTagLength() { + return 1; + } + + @Override + public int getValueLengthV2() { + return 0; + } + @Override public void setAllowNull(boolean allowNull) { this.allowNull = allowNull; @@ -75,7 +85,13 @@ public boolean isAllowNull() { return allowNull; } - public void encodeKey(Buf buf, List data) {throw new RuntimeException("Array cannot be key");} + public void encodeKey(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + + public void encodeKeyV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } private int internalEncodeKey(Buf buf, byte[] data) { int groupNum = data.length / 8; @@ -104,13 +120,30 @@ private int internalEncodeKey(Buf buf, byte[] data) { } @Override - public void encodeKeyForUpdate(Buf buf, List data) {throw new RuntimeException("Array cannot be key");} + public void encodeKeyForUpdate(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } @Override - public List decodeKey(Buf buf) {throw new RuntimeException("Array cannot be key");} + public void encodeKeyForUpdateV2(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public List decodeKey(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } @Override - public List decodeKeyPrefix(Buf buf) {throw new RuntimeException("Array cannot be key");} + public List decodeKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public List decodeKeyPrefix(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } + private byte[] internalReadBytes(Buf buf) { int length = buf.reverseReadInt(); int groupNum = length / 9; @@ -134,10 +167,19 @@ private byte[] internalReadBytes(Buf buf) { } @Override - public void skipKey(Buf buf) {throw new RuntimeException("Array cannot be key");} + public void skipKey(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } @Override - public void encodeKeyPrefix(Buf buf, List data) {throw new RuntimeException("Array cannot be key");} + public void skipKeyV2(Buf buf) { + throw new RuntimeException("Array cannot be key"); + } + + @Override + public void encodeKeyPrefix(Buf buf, List data) { + throw new RuntimeException("Array cannot be key"); + } @Override public void encodeValue(Buf buf, List data) { @@ -150,7 +192,7 @@ public void encodeValue(Buf buf, List data) { buf.write(NOTNULL); buf.writeInt(data.size()); for (String value: data) { - if(value == null) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } byte[] bytes = value.getBytes(StandardCharsets.UTF_8); @@ -163,7 +205,7 @@ public void encodeValue(Buf buf, List data) { buf.ensureRemainder( 4 ); buf.writeInt(data.size()); for (String value: data) { - if(value == null) { + if (value == null) { throw new IllegalArgumentException("Array type sub-elements do not support null values"); } byte[] bytes = value.getBytes(StandardCharsets.UTF_8); @@ -174,6 +216,51 @@ public void encodeValue(Buf buf, List data) { } } + @Override + public int encodeValueV2(Buf buf, List data) { + int len = 0; + + if (allowNull) { + if (data == null) { + return 0; + } else { + len = 4; + buf.ensureRemainder(len); + + buf.writeInt(data.size()); + for (String value: data) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + buf.ensureRemainder(4 + bytes.length); + buf.writeInt(bytes.length); + buf.write(bytes); + + len += 4 + bytes.length; + } + } + } else { + len = 4; + buf.ensureRemainder( len); + + buf.writeInt(data.size()); + for (String value: data) { + if (value == null) { + throw new IllegalArgumentException("Array type sub-elements do not support null values"); + } + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + buf.ensureRemainder(4 + bytes.length); + buf.writeInt(bytes.length); + buf.write(bytes); + + len += 4 + bytes.length; + } + } + + return len; + } + @Override public List decodeValue(Buf buf) { if (allowNull) { @@ -189,6 +276,16 @@ public List decodeValue(Buf buf) { return data; } + @Override + public List decodeValueV2(Buf buf) { + List data = new ArrayList<>(); + int length = buf.readInt(); + for (int i = 0; i < length; i++) { + data.add(new String(buf.read(buf.readInt()), StandardCharsets.UTF_8)); + } + return data; + } + @Override public void skipValue(Buf buf) { if (allowNull) { @@ -198,8 +295,17 @@ public void skipValue(Buf buf) { } int length = buf.readInt(); for (int i = 0; i < length; i++) { - int str_size = buf.readInt(); - buf.skip(str_size); + int strSize = buf.readInt(); + buf.skip(strSize); + } + } + + @Override + public void skipValueV2(Buf buf) { + int length = buf.readInt(); + for (int i = 0; i < length; i++) { + int strSize = buf.readInt(); + buf.skip(strSize); } } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/StringSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/StringSchema.java index 16dbb49af..4630b512b 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/StringSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/StringSchema.java @@ -63,6 +63,16 @@ public int getLength() { return 0; } + @Override + public int getWithNullTagLength() { + return 1; + } + + @Override + public int getValueLengthV2() { + return 0; + } + @Override public void setAllowNull(boolean allowNull) { this.allowNull = allowNull; @@ -95,6 +105,33 @@ public void encodeKey(Buf buf, String data) { } } + public void encodeKeyV2(Buf buf, String data) { + if (allowNull) { + if (data == null) { + buf.ensureRemainder(1); + buf.write(NULL); + //buf.reverseWriteInt0(); + } else { + buf.ensureRemainder(1); + buf.write(NOTNULL); + byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + int size = internalEncodeKey(buf, bytes); + //buf.ensureRemainder(4); + //buf.reverseWriteInt(size); + } + } else { + if (data == null) { + throw new RuntimeException("Data is not allow as null."); + } + //buf.ensureRemainder(1); + //buf.write(NOTNULL); + byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + int size = internalEncodeKey(buf, bytes); + //buf.ensureRemainder(4); + //buf.reverseWriteInt(size); + } + } + private int internalEncodeKey(Buf buf, byte[] data) { int groupNum = data.length / 8; int size = (groupNum + 1) * 9; @@ -133,11 +170,35 @@ public void encodeKeyForUpdate(Buf buf, String data) { buf.reverseWriteInt(internalEncodeKeyForUpdate(buf, bytes)); } } else { + buf.write(NOTNULL); byte[] bytes = data.getBytes(StandardCharsets.UTF_8); buf.reverseWriteInt(internalEncodeKeyForUpdate(buf, bytes)); } } + @Override + public void encodeKeyForUpdateV2(Buf buf, String data) { + if (allowNull) { + if (data == null) { + buf.write(NULL); + //buf.reverseWriteInt0(); + } else { + buf.write(NOTNULL); + byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + //buf.reverseWriteInt(internalEncodeKeyForUpdate(buf, bytes)); + internalEncodeKeyForUpdate(buf, bytes); + } + } else { + if (data == null) { + throw new RuntimeException("Data is not allow as null."); + } + buf.write(NOTNULL); + byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + //buf.reverseWriteInt(internalEncodeKeyForUpdate(buf, bytes)); + internalEncodeKeyForUpdate(buf, bytes); + } + } + private int internalEncodeKeyForUpdate(Buf buf, byte[] data) { int groupNum = data.length / 8; int size = (groupNum + 1) * 9; @@ -178,6 +239,20 @@ public String decodeKey(Buf buf) { return new String(internalReadBytes(buf), StandardCharsets.UTF_8); } + @Override + public String decodeKeyV2(Buf buf) { + if (allowNull) { + if (buf.read() == NULL) { + //buf.reverseSkipInt(); + return null; + } + } + + return new String(internalReadBytesV2(buf), StandardCharsets.UTF_8); + } + + //This interface is both used by v1 and v2. We use same way to decode key prefix. + //In the new way, we decode string value directly but not by length field. @Override public String decodeKeyPrefix(Buf buf) { if (allowNull) { @@ -193,7 +268,8 @@ private byte[] internalReadKeyPrefixBytes(Buf buf) { do { length += 9; buf.skip(8); - } while(buf.read() == (byte) 255); + } + while (buf.read() == (byte) 255); int groupNum = length / 9; buf.skip(-1); int reminderZero = 255 - buf.read() & 0xFF; @@ -236,6 +312,10 @@ private byte[] internalReadBytes(Buf buf) { return data; } + private byte[] internalReadBytesV2(Buf buf) { + return internalReadKeyPrefixBytes(buf); + } + @Override public void skipKey(Buf buf) { if (allowNull) { @@ -245,6 +325,16 @@ public void skipKey(Buf buf) { } } + @Override + public void skipKeyV2(Buf buf) { + if (allowNull) { + //buf.skip(buf.reverseReadInt() + 1); + buf.skip(1); + } + + internalReadBytesV2(buf); + } + @Override public void encodeKeyPrefix(Buf buf, String data) { if (allowNull) { @@ -283,6 +373,33 @@ public void encodeValue(Buf buf, String data) { } } + @Override + public int encodeValueV2(Buf buf, String data) { + int len = 0; + + if (allowNull) { + if (data == null) { + return 0; + } else { + byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + buf.ensureRemainder(4 + bytes.length); + buf.writeInt(bytes.length); + buf.write(bytes); + + len += 4 + bytes.length; + } + } else { + byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + buf.ensureRemainder(4 + bytes.length); + buf.writeInt(bytes.length); + buf.write(bytes); + + len += 4 + bytes.length; + } + + return len; + } + @Override public String decodeValue(Buf buf) { if (allowNull) { @@ -295,6 +412,11 @@ public String decodeValue(Buf buf) { return new String(buf.read(buf.readInt()), StandardCharsets.UTF_8); } + @Override + public String decodeValueV2(Buf buf) { + return new String(buf.read(buf.readInt()), StandardCharsets.UTF_8); + } + @Override public void skipValue(Buf buf) { if (allowNull) { @@ -305,4 +427,9 @@ public void skipValue(Buf buf) { buf.skip(buf.readInt()); } } + + @Override + public void skipValueV2(Buf buf) { + buf.skip(buf.readInt()); + } } diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/VectorSchema.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/VectorSchema.java index 98daabb4a..d8005d671 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/VectorSchema.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/common/serial/schema/VectorSchema.java @@ -65,6 +65,16 @@ public int getLength() { return 0; } + @Override + public int getWithNullTagLength() { + return 1; + } + + @Override + public int getValueLengthV2() { + return 0; + } + @Override public void setAllowNull(boolean allowNull) { this.allowNull = allowNull; @@ -79,16 +89,30 @@ public void encodeKey(Buf buf, String data) { throw new RuntimeException("Vector cannot be key"); } + public void encodeKeyV2(Buf buf, String data) { + throw new RuntimeException("Vector cannot be key"); + } + @Override public void encodeKeyForUpdate(Buf buf, String data) { throw new RuntimeException("Vector cannot be key"); } + @Override + public void encodeKeyForUpdateV2(Buf buf, String data) { + throw new RuntimeException("Vector cannot be key"); + } + @Override public String decodeKey(Buf buf) { throw new RuntimeException("Vector cannot be key"); } + @Override + public String decodeKeyV2(Buf buf) { + throw new RuntimeException("Vector cannot be key"); + } + @Override public String decodeKeyPrefix(Buf buf) { throw new RuntimeException("Vector cannot be key"); @@ -99,6 +123,11 @@ public void skipKey(Buf buf) { throw new RuntimeException("Vector cannot be key"); } + @Override + public void skipKeyV2(Buf buf) { + throw new RuntimeException("Vector cannot be key"); + } + @Override public void encodeKeyPrefix(Buf buf, String data) { throw new RuntimeException("Vector cannot be key"); @@ -125,6 +154,34 @@ public void encodeValue(Buf buf, String data) { } } + @Override + public int encodeValueV2(Buf buf, String data) { + int len = 0; + + if (allowNull) { + if (data == null) { + return 0; + } else { + byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + buf.ensureRemainder(4 + bytes.length); + buf.write(NOTNULL); + buf.writeInt(bytes.length); + buf.write(bytes); + + len += 4 + bytes.length; + } + } else { + byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + buf.ensureRemainder(4 + bytes.length); + buf.writeInt(bytes.length); + buf.write(bytes); + + len += 4 + bytes.length; + } + + return len; + } + @Override public String decodeValue(Buf buf) { if (allowNull) { @@ -137,6 +194,11 @@ public String decodeValue(Buf buf) { return new String(buf.read(buf.readInt()), StandardCharsets.UTF_8); } + @Override + public String decodeValueV2(Buf buf) { + return new String(buf.read(buf.readInt()), StandardCharsets.UTF_8); + } + @Override public void skipValue(Buf buf) { if (allowNull) { @@ -147,4 +209,9 @@ public void skipValue(Buf buf) { buf.skip(buf.readInt()); } } + + @Override + public void skipValueV2(Buf buf) { + buf.skip(buf.readInt()); + } } diff --git a/java/dingo-sdk/src/test/java/sdk/common/serial/Test.java b/java/dingo-sdk/src/test/java/sdk/common/serial/Test.java index c992fd36e..4cb447192 100644 --- a/java/dingo-sdk/src/test/java/sdk/common/serial/Test.java +++ b/java/dingo-sdk/src/test/java/sdk/common/serial/Test.java @@ -44,7 +44,7 @@ public static void main(String[] args) { for(Object[] record : records) { KeyValue kv = re.encode(record); kvs.add(kv); - System.out.println("=====" + kv.getValue().length); + //System.out.println("=====" + kv.getValue().length); } long tag5 = System.currentTimeMillis(); @@ -210,6 +210,7 @@ private static List getTable() { private static List getRecords() { byte[] k1 = new byte[1049076]; + for (int i = 0; i < 1049076; i++) { k1[i] = ((byte) (i%256)); } @@ -233,7 +234,8 @@ private static List getRecords() { }; List allRecord = new ArrayList<>(); - for (int i = 0; i < 100; i ++) { + //for (int i = 0; i < 100; i ++) { + for (int i = 0; i < 10000; i ++) { Object[] r = new Object[12]; System.arraycopy(record, 1, r, 1, 11); r[0] = i; diff --git a/java/dingo-sdk/src/test/java/sdk/common/serial/TestAllType.java b/java/dingo-sdk/src/test/java/sdk/common/serial/TestAllType.java index 5e8933d9f..13f6e882a 100644 --- a/java/dingo-sdk/src/test/java/sdk/common/serial/TestAllType.java +++ b/java/dingo-sdk/src/test/java/sdk/common/serial/TestAllType.java @@ -44,11 +44,52 @@ public void setUp() { recordEncoder = new RecordEncoder(schemaVersion, schemas, commonId); recordDecoder = new RecordDecoder(schemaVersion, schemas, commonId); } + + /* @Test public void testEncodeAndDecode() { Object[] records = getRecords(); + recordEncoder.setCodecVersion( 0x1 ); + + KeyValue kv = recordEncoder.encode(records); + + Object[] result = recordDecoder.decode(kv); + // Positive test case + Assert.assertNotNull(result); + Assert.assertEquals(records.length, result.length); + // Negative test case + for (int i = 0; i < schemas.size(); i++) { + System.out.println(records[i]+","+result[i]); + switch (schemas.get(i).getType()) { + case BYTES: + byte[] e = (byte[]) records[i]; + byte[] r = (byte[]) result[i]; + Assert.assertArrayEquals(e, r); + break; + case BOOLEANLIST: + case STRINGLIST: + case DOUBLELIST: + case FLOATLIST: + case INTEGERLIST: + case LONGLIST: + val e_val = records[i]; + val r_cal = result[i]; + Assert.assertTrue(e_val.equals(r_cal)); + break; + default: + Assert.assertEquals(records[i], result[i]); + break; + } + } + } + */ + + @Test + public void testEncodeAndDecodeV2() { + Object[] records = getRecords(); + recordEncoder.setCodecVersion( 0x2 ); KeyValue kv = recordEncoder.encode(records); Object[] result = recordDecoder.decode(kv); @@ -80,10 +121,64 @@ public void testEncodeAndDecode() { } } } + + /* @Test public void testDecodeWithColumnIndexes() { Object[] records = getRecords(); + recordEncoder.setCodecVersion( 0x1 ); + + KeyValue kv = recordEncoder.encode(records); + + int[] columnIndexes = new int[]{0, 3, 6, 8, 10, 11, 13, 15, 16, 18, 19, 21, 22}; + Object[] result = recordDecoder.decode(kv, columnIndexes); + // Positive test case + Assert.assertNotNull(result); + Assert.assertEquals(columnIndexes.length, result.length); + // Negative test case + int j = 0; + for (int i = 0; i < schemas.size(); i++) { + if(j < columnIndexes.length) + System.out.println(records[i]+","+result[j]); + switch (schemas.get(i).getType()) { + case BYTES: + if (isValueInArray(i, columnIndexes)) { + byte[] e = (byte[]) records[i]; + byte[] r = (byte[]) result[j]; + Assert.assertArrayEquals(e, r); + j++; + } + break; + case BOOLEANLIST: + case STRINGLIST: + case DOUBLELIST: + case FLOATLIST: + case INTEGERLIST: + case LONGLIST: + if (isValueInArray(i, columnIndexes)) { + val e_val = records[i]; + val r_cal = result[j]; + Assert.assertTrue(e_val.equals(r_cal)); + j++; + } + break; + default: + if (isValueInArray(i, columnIndexes)) { + Assert.assertEquals(records[i], result[j]); + j++; + } + break; + } + } + } + */ + + @Test + public void testDecodeWithColumnIndexesV2() { + + Object[] records = getRecords(); + recordEncoder.setCodecVersion( 0x2 ); KeyValue kv = recordEncoder.encode(records); @@ -254,6 +349,16 @@ private static List getTable() { testLongList2.setAllowNull(true); testLongList2.setIsKey(false); + DingoSchema emptyStr1 = new StringSchema(); + emptyStr1.setIndex(24); + emptyStr1.setAllowNull(false); + emptyStr1.setIsKey(false); + + DingoSchema emptyStr2 = new StringSchema(); + emptyStr2.setIndex(25); + emptyStr2.setAllowNull(false); + emptyStr2.setIsKey(false); + List table = new ArrayList<>(); table.add(id); table.add(name); @@ -279,6 +384,8 @@ private static List getTable() { table.add(testIntegerList2); table.add(testLongList1); table.add(testLongList2); + table.add(emptyStr1); + table.add(emptyStr2); return table; } @@ -365,7 +472,9 @@ private static Object[] getRecords() { i1, i2, l1, - l2 + l2, + "", + "abcdef" }; return record; } diff --git a/java/dingo-sdk/src/test/java/sdk/common/serial/TestListType.java b/java/dingo-sdk/src/test/java/sdk/common/serial/TestListType.java index 6dd4dad58..157b5121d 100644 --- a/java/dingo-sdk/src/test/java/sdk/common/serial/TestListType.java +++ b/java/dingo-sdk/src/test/java/sdk/common/serial/TestListType.java @@ -30,6 +30,8 @@ public class TestListType { public static void main(String[] args) { List table = getTable(); RecordEncoder re = new RecordEncoder(0, table, 0L); + //re.setCodecVersion(0x01); + RecordDecoder rd = new RecordDecoder(0, table, 0L); List records = getRecords(); @@ -59,6 +61,7 @@ public static void main(String[] args) { int[] index = new int[]{4,5,10}; long tag3 = System.currentTimeMillis(); for (KeyValue kv : kvs) { + //rd.decodeValue(kv, index); rd.decodeValue(kv, index); } long tag4 = System.currentTimeMillis(); @@ -276,7 +279,7 @@ private static List getRecords() { }; List allRecord = new ArrayList<>(); - for (int i = 0; i < 1000000; i ++) { + for (int i = 0; i < 10000; i ++) { Object[] r = new Object[16]; System.arraycopy(record, 1, r, 1, 15); r[0] = i; diff --git a/src/coprocessor/coprocessor.cc b/src/coprocessor/coprocessor.cc index 62265ffa4..0cd075752 100755 --- a/src/coprocessor/coprocessor.cc +++ b/src/coprocessor/coprocessor.cc @@ -351,7 +351,7 @@ butil::Status Coprocessor::DoExecute(const pb::common::KeyValue& kv, bool* has_r runner.Decode(reinterpret_cast(coprocessor_.expression().c_str()), coprocessor_.expression().length()); auto tuple = std::make_unique(); - RelExprHelper::TransToOperandWrapper(original_serial_schemas_, selection_column_indexes_, original_record, tuple); + RelExprHelper::TransToOperandWrapper(0x02, original_serial_schemas_, selection_column_indexes_, original_record, tuple); runner.BindTuple(tuple.get()); runner.Run(); std::optional ok = runner.GetOptional(); diff --git a/src/coprocessor/coprocessor_scalar.cc b/src/coprocessor/coprocessor_scalar.cc index 88e82a573..55a353328 100755 --- a/src/coprocessor/coprocessor_scalar.cc +++ b/src/coprocessor/coprocessor_scalar.cc @@ -103,7 +103,7 @@ butil::Status CoprocessorScalar::Filter(const pb::common::VectorScalardata& scal std::unique_ptr> result_operand_ptr; - status = DoRelExprCore(original_record, result_operand_ptr); + status = DoRelExprCore(0x02, original_record, result_operand_ptr); if (!status.ok()) { DINGO_LOG(ERROR) << status.error_cstr(); return status; diff --git a/src/coprocessor/coprocessor_v2.cc b/src/coprocessor/coprocessor_v2.cc index 2d4c02c50..dfe957e80 100755 --- a/src/coprocessor/coprocessor_v2.cc +++ b/src/coprocessor/coprocessor_v2.cc @@ -257,6 +257,9 @@ butil::Status CoprocessorV2::Execute(IteratorPtr iter, bool key_only, size_t max bool has_result_kv = false; pb::common::KeyValue result_key_value; DINGO_LOG(DEBUG) << fmt::format("CoprocessorV2::DoExecute Call"); + + //Get codec version from key. + //int codec_version = GetCodecVersion(kv.key); status = DoExecute(kv.key(), kv.value(), &has_result_kv, &result_key_value); if (!status.ok()) { DINGO_LOG(ERROR) << fmt::format("CoprocessorV2::Execute failed"); @@ -447,7 +450,8 @@ butil::Status CoprocessorV2::DoExecute(const std::string& key, const std::string trans_field_spend_time_ms += lambda_time_diff_microseconds_function(trans_start, trans_end); }); #endif - status = RelExprHelper::TransFromOperandWrapper(result_operand_ptr, result_serial_schemas_, result_column_indexes_, + int codec_version = GetCodecVersion(key); + status = RelExprHelper::TransFromOperandWrapper(codec_version, result_operand_ptr, result_serial_schemas_, result_column_indexes_, result_record); if (!status.ok()) { DINGO_LOG(ERROR) << status.error_cstr(); @@ -495,7 +499,7 @@ butil::Status CoprocessorV2::DoFilter(const std::string& key, const std::string& return status; } -butil::Status CoprocessorV2::DoRelExprCore(const std::vector& original_record, +butil::Status CoprocessorV2::DoRelExprCore(int codec_version, const std::vector& original_record, std::unique_ptr>& result_operand_ptr) { butil::Status status; @@ -516,7 +520,7 @@ butil::Status CoprocessorV2::DoRelExprCore(const std::vector& original }); #endif - status = RelExprHelper::TransToOperandWrapper(original_serial_schemas_, selection_column_indexes_, original_record, + status = RelExprHelper::TransToOperandWrapper(codec_version, original_serial_schemas_, selection_column_indexes_, original_record, operand_ptr); if (!status.ok()) { DINGO_LOG(ERROR) << status.error_cstr(); @@ -554,6 +558,8 @@ butil::Status CoprocessorV2::DoRelExprCoreWrapper(const std::string& key, const butil::Status status; std::vector original_record; + int codec_version = GetCodecVersion(key); + #if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) { auto lambda_time_now_function = []() { return std::chrono::steady_clock::now(); }; @@ -584,8 +590,7 @@ butil::Status CoprocessorV2::DoRelExprCoreWrapper(const std::string& key, const #if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) } #endif - - return DoRelExprCore(original_record, result_operand_ptr); + return DoRelExprCore(codec_version, original_record, result_operand_ptr); } butil::Status CoprocessorV2::GetKvFromExprEndOfFinish(std::vector* kvs) { @@ -633,7 +638,8 @@ butil::Status CoprocessorV2::GetKvFromExprEndOfFinish(std::vector& original_record, + butil::Status DoRelExprCore(int codec_version, const std::vector& original_record, std::unique_ptr>& result_operand_ptr); // NOLINT butil::Status DoRelExprCoreWrapper(const std::string& key, const std::string& value, std::unique_ptr>& result_operand_ptr); // NOLINT @@ -91,6 +91,14 @@ class CoprocessorV2 : public RawCoprocessor { void ShowSelectionColumnIndexes(); void ShowResultColumnIndexes(); + int GetCodecVersion(const std::string& key) { + if (key.empty()) { + throw std::runtime_error("key should not be empty in func GetCodecVersion."); + } + + return key.c_str()[key.length() - 1]; + } + char prefix_; // NOLINT pb::common::CoprocessorV2 coprocessor_; // NOLINT std::shared_ptr>> original_serial_schemas_; // NOLINT diff --git a/src/coprocessor/rel_expr_helper.cc b/src/coprocessor/rel_expr_helper.cc old mode 100755 new mode 100644 index 3d9db12a5..5e85a572c --- a/src/coprocessor/rel_expr_helper.cc +++ b/src/coprocessor/rel_expr_helper.cc @@ -15,16 +15,37 @@ #include "coprocessor/rel_expr_helper.h" #include +#include #include "common/logging.h" #include "fmt/core.h" -#include "serial/schema/base_schema.h" #include "proto/error.pb.h" +#include "serial/schema/base_schema.h" +#include "serial/record/V2/common.h" namespace dingodb { -butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::any& column, - std::unique_ptr>& operand_ptr) { +template +expr::Operand ToOperandV2(const std::any& v) { + if (v.has_value()) { + return std::any_cast(v); + } + return nullptr; +} + +template +std::any FromOperandV2(const expr::Operand& v) { + if (v != nullptr) { + auto opt = v.GetValue(); + return std::make_any(opt); + } else { + return std::any(); + } +} + +butil::Status RelExprHelper::TransToOperand( + BaseSchema::Type type, const std::any& column, + std::unique_ptr>& operand_ptr) { if (!operand_ptr) { std::string s = fmt::format("operand_ptr is nullptr. not support"); DINGO_LOG(ERROR) << s; @@ -34,9 +55,11 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an switch (type) { case BaseSchema::Type::kBool: { try { - operand_ptr->emplace_back(expr::any_optional_data_adaptor::ToOperand(column)); + operand_ptr->emplace_back( + expr::any_optional_data_adaptor::ToOperand(column)); } catch (const std::bad_any_cast& bad) { - std::string s = fmt::format("{} any_cast std::optional failed", bad.what()); + std::string s = + fmt::format("{} any_cast std::optional failed", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -44,9 +67,11 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an } case BaseSchema::Type::kInteger: { try { - operand_ptr->emplace_back(expr::any_optional_data_adaptor::ToOperand(column)); + operand_ptr->emplace_back( + expr::any_optional_data_adaptor::ToOperand(column)); } catch (const std::bad_any_cast& bad) { - std::string s = fmt::format("{} any_cast std::optional failed", bad.what()); + std::string s = fmt::format( + "{} any_cast std::optional failed", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -54,9 +79,11 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an } case BaseSchema::Type::kFloat: { try { - operand_ptr->emplace_back(expr::any_optional_data_adaptor::ToOperand(column)); + operand_ptr->emplace_back( + expr::any_optional_data_adaptor::ToOperand(column)); } catch (const std::bad_any_cast& bad) { - std::string s = fmt::format("{} any_cast std::optional failed", bad.what()); + std::string s = + fmt::format("{} any_cast std::optional failed", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -64,10 +91,12 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an } case BaseSchema::Type::kLong: { try { - operand_ptr->emplace_back(expr::any_optional_data_adaptor::ToOperand(column)); + operand_ptr->emplace_back( + expr::any_optional_data_adaptor::ToOperand(column)); } catch (const std::bad_any_cast& bad) { - std::string s = fmt::format("{} any_cast std::optional failed", bad.what()); + std::string s = fmt::format( + "{} any_cast std::optional failed", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -75,9 +104,11 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an } case BaseSchema::Type::kDouble: { try { - operand_ptr->emplace_back(expr::any_optional_data_adaptor::ToOperand(column)); + operand_ptr->emplace_back( + expr::any_optional_data_adaptor::ToOperand(column)); } catch (const std::bad_any_cast& bad) { - std::string s = fmt::format("{} any_cast std::optional failed", bad.what()); + std::string s = fmt::format("{} any_cast std::optional failed", + bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -85,9 +116,12 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an } case BaseSchema::Type::kString: { try { - operand_ptr->emplace_back(expr::any_optional_data_adaptor::ToOperand>(column)); + operand_ptr->emplace_back(expr::any_optional_data_adaptor::ToOperand< + std::shared_ptr>(column)); } catch (const std::bad_any_cast& bad) { - std::string s = fmt::format("{} any_cast std::optional> failed", bad.what()); + std::string s = fmt::format( + "{} any_cast std::optional> failed", + bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -95,8 +129,8 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an } case BaseSchema::Type::kBoolList: { try { - operand_ptr->emplace_back( - expr::any_optional_data_adaptor::ToOperand>>(column)); + operand_ptr->emplace_back(expr::any_optional_data_adaptor::ToOperand< + std::shared_ptr>>(column)); } catch (const std::bad_any_cast& bad) { std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); DINGO_LOG(ERROR) << s; @@ -107,7 +141,8 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an case BaseSchema::Type::kIntegerList: { try { operand_ptr->emplace_back( - expr::any_optional_data_adaptor::ToOperand>>(column)); + expr::any_optional_data_adaptor::ToOperand< + std::shared_ptr>>(column)); } catch (const std::bad_any_cast& bad) { std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); DINGO_LOG(ERROR) << s; @@ -117,8 +152,8 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an } case BaseSchema::Type::kFloatList: { try { - operand_ptr->emplace_back( - expr::any_optional_data_adaptor::ToOperand>>(column)); + operand_ptr->emplace_back(expr::any_optional_data_adaptor::ToOperand< + std::shared_ptr>>(column)); } catch (const std::bad_any_cast& bad) { std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); DINGO_LOG(ERROR) << s; @@ -129,7 +164,8 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an case BaseSchema::Type::kLongList: { try { operand_ptr->emplace_back( - expr::any_optional_data_adaptor::ToOperand>>(column)); + expr::any_optional_data_adaptor::ToOperand< + std::shared_ptr>>(column)); } catch (const std::bad_any_cast& bad) { std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); DINGO_LOG(ERROR) << s; @@ -140,7 +176,8 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an case BaseSchema::Type::kDoubleList: { try { operand_ptr->emplace_back( - expr::any_optional_data_adaptor::ToOperand>>(column)); + expr::any_optional_data_adaptor::ToOperand< + std::shared_ptr>>(column)); } catch (const std::bad_any_cast& bad) { std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); DINGO_LOG(ERROR) << s; @@ -151,7 +188,206 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an case BaseSchema::Type::kStringList: { try { operand_ptr->emplace_back( - expr::any_optional_data_adaptor::ToOperand>>(column)); + expr::any_optional_data_adaptor::ToOperand< + std::shared_ptr>>(column)); + } catch (const std::bad_any_cast& bad) { + std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + default: { + std::string s = fmt::format("CloneColumn unsupported type {}", + BaseSchema::GetTypeString(type)); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + } + + return butil::Status(); +} + +butil::Status RelExprHelper::TransToOperandV2( + BaseSchema::Type type, const std::any& column, + std::unique_ptr>& operand_ptr) { + if (!operand_ptr) { + std::string s = fmt::format("operand_ptr is nullptr. not support"); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + + const std::any emptyOperand = std::any(); + + switch (type) { + case BaseSchema::Type::kBool: { + try { + operand_ptr->emplace_back(ToOperandV2(column)); + } catch (const std::bad_any_cast& bad) { + std::string s = + fmt::format("{} any_cast std::optional failed", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kInteger: { + try { + operand_ptr->emplace_back(ToOperandV2(column)); + } catch (const std::bad_any_cast& bad) { + std::string s = fmt::format( + "{} any_cast std::optional failed", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kFloat: { + try { + operand_ptr->emplace_back(ToOperandV2(column)); + } catch (const std::bad_any_cast& bad) { + std::string s = + fmt::format("{} any_cast std::optional failed", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kLong: { + try { + operand_ptr->emplace_back(ToOperandV2(column)); + } catch (const std::bad_any_cast& bad) { + std::string s = fmt::format( + "{} any_cast std::optional failed", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kDouble: { + try { + operand_ptr->emplace_back(ToOperandV2(column)); + } catch (const std::bad_any_cast& bad) { + std::string s = fmt::format("{} any_cast std::optional failed", + bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kString: { + try { + if (column.has_value()) { + auto col_value = std::make_shared( + std::any_cast(column)); + operand_ptr->emplace_back(col_value); + } else { + operand_ptr->emplace_back(nullptr); + } + } catch (const std::bad_any_cast& bad) { + std::string s = fmt::format( + "{} any_cast std::optional> failed", + bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kBoolList: { + try { + if (column.has_value()) { + auto col_value = std::make_shared>( + std::any_cast>(column)); + operand_ptr->emplace_back(col_value); + } else { + operand_ptr->emplace_back(nullptr); + } + + } catch (const std::bad_any_cast& bad) { + std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kIntegerList: { + try { + if (column.has_value()) { + auto col_value = std::make_shared>( + std::any_cast>(column)); + operand_ptr->emplace_back(col_value); + } else { + operand_ptr->emplace_back(nullptr); + } + + } catch (const std::bad_any_cast& bad) { + std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kFloatList: { + try { + if (column.has_value()) { + auto col_value = std::make_shared>( + std::any_cast>(column)); + operand_ptr->emplace_back(col_value); + } else { + operand_ptr->emplace_back(nullptr); + } + + } catch (const std::bad_any_cast& bad) { + std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kLongList: { + try { + if (column.has_value()) { + auto col_value = std::make_shared>( + std::any_cast>(column)); + operand_ptr->emplace_back(col_value); + } else { + operand_ptr->emplace_back(nullptr); + } + + } catch (const std::bad_any_cast& bad) { + std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kDoubleList: { + try { + if (column.has_value()) { + auto col_value = std::make_shared>( + std::any_cast>(column)); + operand_ptr->emplace_back(col_value); + } else { + operand_ptr->emplace_back(nullptr); + } + + } catch (const std::bad_any_cast& bad) { + std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kStringList: { + try { + if (column.has_value()) { + auto col_value = std::make_shared>( + std::any_cast>(column)); + operand_ptr->emplace_back(col_value); + } else { + operand_ptr->emplace_back(nullptr); + } + } catch (const std::bad_any_cast& bad) { std::string s = fmt::format("Trans to Operand failed, {}", bad.what()); DINGO_LOG(ERROR) << s; @@ -160,7 +396,8 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an break; } default: { - std::string s = fmt::format("CloneColumn unsupported type {}", BaseSchema::GetTypeString(type)); + std::string s = fmt::format("CloneColumn unsupported type {}", + BaseSchema::GetTypeString(type)); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -169,9 +406,10 @@ butil::Status RelExprHelper::TransToOperand(BaseSchema::Type type, const std::an return butil::Status(); } -butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, - const std::unique_ptr>& operand_ptr, - size_t index, std::vector& columns) { +butil::Status RelExprHelper::TransFromOperand( + BaseSchema::Type type, + const std::unique_ptr>& operand_ptr, + size_t index, std::vector& columns) { if (!operand_ptr) { std::string s = fmt::format("operand_ptr is nullptr. not support"); DINGO_LOG(ERROR) << s; @@ -181,10 +419,12 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, switch (type) { case BaseSchema::Type::kBool: { try { - columns.emplace_back(expr::any_optional_data_adaptor::FromOperand((*operand_ptr)[index])); + columns.emplace_back(expr::any_optional_data_adaptor::FromOperand( + (*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Operand to std::any> failed, {}", bad.what()); + std::string s = fmt::format( + "Operand to std::any> failed, {}", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -192,9 +432,13 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, } case BaseSchema::Type::kInteger: { try { - columns.emplace_back(expr::any_optional_data_adaptor::FromOperand((*operand_ptr)[index])); + columns.emplace_back( + expr::any_optional_data_adaptor::FromOperand( + (*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Operand to std::any> failed, {}", bad.what()); + std::string s = fmt::format( + "Operand to std::any> failed, {}", + bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -202,9 +446,12 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, } case BaseSchema::Type::kFloat: { try { - columns.emplace_back(expr::any_optional_data_adaptor::FromOperand((*operand_ptr)[index])); + columns.emplace_back( + expr::any_optional_data_adaptor::FromOperand( + (*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Operand to std::any> failed, {}", bad.what()); + std::string s = fmt::format( + "Operand to std::any> failed, {}", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -212,9 +459,13 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, } case BaseSchema::Type::kLong: { try { - columns.emplace_back(expr::any_optional_data_adaptor::FromOperand((*operand_ptr)[index])); + columns.emplace_back( + expr::any_optional_data_adaptor::FromOperand( + (*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Operand to std::any> failed, {}", bad.what()); + std::string s = fmt::format( + "Operand to std::any> failed, {}", + bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -222,9 +473,13 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, } case BaseSchema::Type::kDouble: { try { - columns.emplace_back(expr::any_optional_data_adaptor::FromOperand((*operand_ptr)[index])); + columns.emplace_back( + expr::any_optional_data_adaptor::FromOperand( + (*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Operand to std::any> failed, {}", bad.what()); + std::string s = + fmt::format("Operand to std::any> failed, {}", + bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -233,9 +488,12 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, case BaseSchema::Type::kString: { try { columns.emplace_back( - expr::any_optional_data_adaptor::FromOperand>((*operand_ptr)[index])); + expr::any_optional_data_adaptor::FromOperand< + std::shared_ptr>((*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Operand to std::any> failed, {}", bad.what()); + std::string s = fmt::format( + "Operand to std::any> failed, {}", + bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -244,9 +502,11 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, case BaseSchema::Type::kBoolList: { try { columns.emplace_back( - expr::any_optional_data_adaptor::FromOperand>>((*operand_ptr)[index])); + expr::any_optional_data_adaptor::FromOperand< + std::shared_ptr>>((*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Trans from operand failed, {}", bad.what()); + std::string s = + fmt::format("Trans from operand failed, {}", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -255,9 +515,11 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, case BaseSchema::Type::kIntegerList: { try { columns.emplace_back( - expr::any_optional_data_adaptor::FromOperand>>((*operand_ptr)[index])); + expr::any_optional_data_adaptor::FromOperand< + std::shared_ptr>>((*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Trans from operand failed, {}", bad.what()); + std::string s = + fmt::format("Trans from operand failed, {}", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -266,9 +528,11 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, case BaseSchema::Type::kFloatList: { try { columns.emplace_back( - expr::any_optional_data_adaptor::FromOperand>>((*operand_ptr)[index])); + expr::any_optional_data_adaptor::FromOperand< + std::shared_ptr>>((*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Trans from operand failed, {}", bad.what()); + std::string s = + fmt::format("Trans from operand failed, {}", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -277,9 +541,11 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, case BaseSchema::Type::kLongList: { try { columns.emplace_back( - expr::any_optional_data_adaptor::FromOperand>>((*operand_ptr)[index])); + expr::any_optional_data_adaptor::FromOperand< + std::shared_ptr>>((*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Trans from operand failed, {}", bad.what()); + std::string s = + fmt::format("Trans from operand failed, {}", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -288,9 +554,11 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, case BaseSchema::Type::kDoubleList: { try { columns.emplace_back( - expr::any_optional_data_adaptor::FromOperand>>((*operand_ptr)[index])); + expr::any_optional_data_adaptor::FromOperand< + std::shared_ptr>>((*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Trans from operand failed, {}", bad.what()); + std::string s = + fmt::format("Trans from operand failed, {}", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -298,17 +566,222 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, } case BaseSchema::Type::kStringList: { try { - columns.emplace_back(expr::any_optional_data_adaptor::FromOperand>>( + columns.emplace_back(expr::any_optional_data_adaptor::FromOperand< + std::shared_ptr>>( (*operand_ptr)[index])); } catch (const std::bad_variant_access& bad) { - std::string s = fmt::format("Trans from operand failed, {}", bad.what()); + std::string s = + fmt::format("Trans from operand failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + default: { + std::string s = fmt::format("CloneColumn unsupported type {}", + BaseSchema::GetTypeString(type)); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + } + + return butil::Status(); +} + +butil::Status RelExprHelper::TransFromOperandV2( + BaseSchema::Type type, + const std::unique_ptr>& operand_ptr, + size_t index, std::vector& columns) { + if (!operand_ptr) { + std::string s = fmt::format("operand_ptr is nullptr. not support"); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + + switch (type) { + case BaseSchema::Type::kBool: { + try { + columns.emplace_back(FromOperandV2((*operand_ptr)[index])); + } catch (const std::bad_variant_access& bad) { + std::string s = + fmt::format("Operand V2 to bool failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kInteger: { + try { + columns.emplace_back(FromOperandV2((*operand_ptr)[index])); + } catch (const std::bad_variant_access& bad) { + std::string s = + fmt::format("Operand to int32_t failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kFloat: { + try { + columns.emplace_back(FromOperandV2((*operand_ptr)[index])); + } catch (const std::bad_variant_access& bad) { + std::string s = fmt::format("Operand to float failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kLong: { + try { + columns.emplace_back(FromOperandV2((*operand_ptr)[index])); + } catch (const std::bad_variant_access& bad) { + std::string s = + fmt::format("Operand to int64_t failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kDouble: { + try { + columns.emplace_back(FromOperandV2((*operand_ptr)[index])); + } catch (const std::bad_variant_access& bad) { + std::string s = fmt::format("Operand to double failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kString: { + try { + std::shared_ptr operand_value = + expr::any_optional_data_adaptor::FromOperandV2< + std::shared_ptr>((*operand_ptr)[index]); + if (operand_value) { + columns.emplace_back(std::any(std::move(*operand_value))); + } else { + columns.emplace_back(std::any()); + } + } catch (const std::bad_variant_access& bad) { + std::string s = + fmt::format("Operand to std::string failed, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kBoolList: { + try { + std::shared_ptr> operand_value = + expr::any_optional_data_adaptor::FromOperandV2< + std::shared_ptr>>((*operand_ptr)[index]); + if (operand_value) { + columns.emplace_back(std::any(std::move(*operand_value))); + } else { + columns.emplace_back(std::any()); + } + } catch (const std::bad_variant_access& bad) { + std::string s = fmt::format( + "Trans from operand failed for bool list, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kIntegerList: { + try { + std::shared_ptr> operand_value = + expr::any_optional_data_adaptor::FromOperandV2< + std::shared_ptr>>((*operand_ptr)[index]); + if (operand_value) { + columns.emplace_back(std::any(std::move(*operand_value))); + } else { + columns.emplace_back(std::any()); + } + } catch (const std::bad_variant_access& bad) { + std::string s = fmt::format( + "Trans from operand failedfor integer list, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kFloatList: { + try { + std::shared_ptr> operand_value = + expr::any_optional_data_adaptor::FromOperandV2< + std::shared_ptr>>((*operand_ptr)[index]); + if (operand_value) { + columns.emplace_back(std::any(std::move(*operand_value))); + } else { + columns.emplace_back(std::any()); + } + } catch (const std::bad_variant_access& bad) { + std::string s = fmt::format( + "Trans from operand failed for float list, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kLongList: { + try { + std::shared_ptr> operand_value = + expr::any_optional_data_adaptor::FromOperandV2< + std::shared_ptr>>((*operand_ptr)[index]); + if (operand_value) { + columns.emplace_back(std::any(std::move(*operand_value))); + } else { + columns.emplace_back(std::any()); + } + } catch (const std::bad_variant_access& bad) { + std::string s = fmt::format( + "Trans from operand failed for long list, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kDoubleList: { + try { + std::shared_ptr> operand_value = + expr::any_optional_data_adaptor::FromOperandV2< + std::shared_ptr>>((*operand_ptr)[index]); + if (operand_value) { + columns.emplace_back(std::any(std::move(*operand_value))); + } else { + columns.emplace_back(std::any()); + } + } catch (const std::bad_variant_access& bad) { + std::string s = fmt::format( + "Trans from operand failed for double list, {}", bad.what()); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + break; + } + case BaseSchema::Type::kStringList: { + try { + std::shared_ptr> operand_value = + expr::any_optional_data_adaptor::FromOperandV2< + std::shared_ptr>>( + (*operand_ptr)[index]); + if (operand_value) { + columns.emplace_back(std::any(std::move(*operand_value))); + } else { + columns.emplace_back(std::any()); + } + } catch (const std::bad_variant_access& bad) { + std::string s = fmt::format( + "Trans from operand failed for string list, {}", bad.what()); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } break; } default: { - std::string s = fmt::format("CloneColumn unsupported type {}", BaseSchema::GetTypeString(type)); + std::string s = fmt::format("CloneColumn unsupported type {}", + BaseSchema::GetTypeString(type)); DINGO_LOG(ERROR) << s; return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); } @@ -318,39 +791,78 @@ butil::Status RelExprHelper::TransFromOperand(BaseSchema::Type type, } butil::Status RelExprHelper::TransToOperandWrapper( - const std::shared_ptr>>& original_serial_schemas, - const std::vector& selection_column_indexes, const std::vector& original_record, + const int codec_version, + const std::shared_ptr>>& + original_serial_schemas, + const std::vector& selection_column_indexes, + const std::vector& original_record, std::unique_ptr>& operand_ptr) { butil::Status status; size_t i = 0; - for (const auto& record : original_record) { - BaseSchema::Type type = (*original_serial_schemas)[selection_column_indexes[i++]]->GetType(); - status = RelExprHelper::TransToOperand(type, record, operand_ptr); - if (!status.ok()) { - DINGO_LOG(ERROR) << status.error_cstr(); - return status; + if (codec_version <= dingodb::serialV2::CODEC_VERSION_V1) { + for (const auto& record : original_record) { + BaseSchema::Type type = + (*original_serial_schemas)[selection_column_indexes[i++]]->GetType(); + + status = RelExprHelper::TransToOperand(type, record, operand_ptr); + if (!status.ok()) { + DINGO_LOG(ERROR) << status.error_cstr(); + return status; + } + } + } else { + for (const auto& record : original_record) { + BaseSchema::Type type = + (*original_serial_schemas)[selection_column_indexes[i++]]->GetType(); + + status = RelExprHelper::TransToOperandV2(type, record, operand_ptr); + if (!status.ok()) { + DINGO_LOG(ERROR) << status.error_cstr(); + return status; + } } } return butil::Status(); } butil::Status RelExprHelper::TransFromOperandWrapper( + const int codec_version, const std::unique_ptr>& operand_ptr, - const std::shared_ptr>>& result_serial_schemas, - const std::vector& result_column_indexes, std::vector& result_record) { + const std::shared_ptr>>& + result_serial_schemas, + const std::vector& result_column_indexes, + std::vector& result_record) { butil::Status status; size_t i = 0; - for (const auto& tuple : *operand_ptr) { - BaseSchema::Type type = (*result_serial_schemas)[result_column_indexes[i]]->GetType(); - status = RelExprHelper::TransFromOperand(type, operand_ptr, i, result_record); - if (!status.ok()) { - DINGO_LOG(ERROR) << status.error_cstr(); - return status; - } - i++; + + if (codec_version <= dingodb::serialV2::CODEC_VERSION_V1) { // codec v1 for 0 or 1. + for (const auto& tuple : *operand_ptr) { + BaseSchema::Type type = + (*result_serial_schemas)[result_column_indexes[i]]->GetType(); + status = + RelExprHelper::TransFromOperand(type, operand_ptr, i, result_record); + if (!status.ok()) { + DINGO_LOG(ERROR) << status.error_cstr(); + return status; + } + i++; + } + } else { // codec v2. + for (const auto& tuple : *operand_ptr) { + BaseSchema::Type type = + (*result_serial_schemas)[result_column_indexes[i]]->GetType(); + status = RelExprHelper::TransFromOperandV2(type, operand_ptr, i, + result_record); + if (!status.ok()) { + DINGO_LOG(ERROR) << status.error_cstr(); + return status; + } + i++; + } } + return butil::Status(); } diff --git a/src/coprocessor/rel_expr_helper.h b/src/coprocessor/rel_expr_helper.h index 96f619071..439ddfa9a 100755 --- a/src/coprocessor/rel_expr_helper.h +++ b/src/coprocessor/rel_expr_helper.h @@ -93,12 +93,21 @@ class RelExprHelper { const std::unique_ptr>& operand_ptr, size_t index, std::vector& columns); // NOLINT + static butil::Status TransToOperandV2(BaseSchema::Type type, const std::any& column, + std::unique_ptr>& operand_ptr); // NOLINT + + static butil::Status TransFromOperandV2(BaseSchema::Type type, + const std::unique_ptr>& operand_ptr, size_t index, + std::vector& columns); // NOLINT + static butil::Status TransToOperandWrapper( + const int codec_version, const std::shared_ptr>>& original_serial_schemas, const std::vector& selection_column_indexes, const std::vector& original_record, std::unique_ptr>& operand_ptr); // NOLINT static butil::Status TransFromOperandWrapper( + const int codec_version, const std::unique_ptr>& operand_ptr, const std::shared_ptr>>& result_serial_schemas, const std::vector& result_column_indexes, std::vector& result_record); diff --git a/src/libexpr b/src/libexpr index 3e465d30b..0a40d235c 160000 --- a/src/libexpr +++ b/src/libexpr @@ -1 +1 @@ -Subproject commit 3e465d30b35cc3c8122919f8eea7621100ee2b22 +Subproject commit 0a40d235cbfc06f0732a5eaf2c16287f5574a099 diff --git a/src/serial b/src/serial index e707d72e7..1e0e53b2d 160000 --- a/src/serial +++ b/src/serial @@ -1 +1 @@ -Subproject commit e707d72e7b9b8276e5a70f0b0551bd6ce0989b3e +Subproject commit 1e0e53b2deb4e78a5f997f62989cba4fbcaa8cb6 diff --git a/test/unit_test/common/test_rel_expr_helper.cc b/test/unit_test/common/test_rel_expr_helper.cc index 8267b028f..7c2b52da5 100644 --- a/test/unit_test/common/test_rel_expr_helper.cc +++ b/test/unit_test/common/test_rel_expr_helper.cc @@ -806,7 +806,7 @@ TEST_F(RelExprHelperTest, TransToOperandWrapper) { std::vector original_record; std::unique_ptr> operand_ptr; - ok = RelExprHelper::TransToOperandWrapper(original_serial_schemas, selection_column_indexes, original_record, + ok = RelExprHelper::TransToOperandWrapper(0x01, original_serial_schemas, selection_column_indexes, original_record, operand_ptr); EXPECT_EQ(ok.error_code(), pb::error::OK); } @@ -823,7 +823,7 @@ TEST_F(RelExprHelperTest, TransToOperandWrapper) { ok = Utils::TransToSerialSchema(pb_schemas, &original_serial_schemas); EXPECT_EQ(ok.error_code(), pb::error::OK); - ok = RelExprHelper::TransToOperandWrapper(original_serial_schemas, selection_column_indexes, original_record, + ok = RelExprHelper::TransToOperandWrapper(0x01, original_serial_schemas, selection_column_indexes, original_record, operand_ptr); EXPECT_EQ(ok.error_code(), pb::error::OK); } @@ -896,7 +896,7 @@ TEST_F(RelExprHelperTest, TransToOperandWrapper) { original_record.emplace_back(std::optional(1.23f)); original_record.emplace_back(std::optional(10)); original_record.emplace_back(std::optional(true)); - ok = RelExprHelper::TransToOperandWrapper(original_serial_schemas, selection_column_indexes, original_record, + ok = RelExprHelper::TransToOperandWrapper(0x01, original_serial_schemas, selection_column_indexes, original_record, operand_ptr); EXPECT_EQ(ok.error_code(), pb::error::OK); @@ -971,7 +971,7 @@ TEST_F(RelExprHelperTest, TransFromOperandWrapper) { std::vector result_column_indexes; std::vector result_record; - ok = RelExprHelper::TransFromOperandWrapper(operand_ptr, result_serial_schemas, result_column_indexes, + ok = RelExprHelper::TransFromOperandWrapper(0x01, operand_ptr, result_serial_schemas, result_column_indexes, result_record); } @@ -1095,7 +1095,7 @@ TEST_F(RelExprHelperTest, TransFromOperandWrapper) { result_column_indexes.push_back(4); result_column_indexes.push_back(5); - ok = RelExprHelper::TransFromOperandWrapper(operand_ptr, result_serial_schemas, result_column_indexes, + ok = RelExprHelper::TransFromOperandWrapper(0x01, operand_ptr, result_serial_schemas, result_column_indexes, result_record); for (const auto &record : result_record) { @@ -1267,7 +1267,7 @@ TEST_F(RelExprHelperTest, TransFromOperandWrapper) { result_column_indexes.push_back(1); result_column_indexes.push_back(0); - ok = RelExprHelper::TransFromOperandWrapper(operand_ptr, result_serial_schemas, result_column_indexes, + ok = RelExprHelper::TransFromOperandWrapper(0x01, operand_ptr, result_serial_schemas, result_column_indexes, result_record); for (const auto &record : result_record) {