diff --git a/.github/workflows/ci-integration-process.yaml b/.github/workflows/ci-integration-process.yaml index 8b9799d5ea2de..474a552b3a1ce 100644 --- a/.github/workflows/ci-integration-process.yaml +++ b/.github/workflows/ci-integration-process.yaml @@ -52,8 +52,8 @@ jobs: - name: clean disk if: steps.docs.outputs.changed_only == 'no' run: | - sudo swapoff -a - sudo rm -f /swapfile + sudo swapoff /swapfile + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc sudo apt clean docker rmi $(docker images -q) -f df -h diff --git a/.github/workflows/ci-integration-thread.yaml b/.github/workflows/ci-integration-thread.yaml index f4eef6bc89c1c..fae037d81f090 100644 --- a/.github/workflows/ci-integration-thread.yaml +++ b/.github/workflows/ci-integration-thread.yaml @@ -52,8 +52,8 @@ jobs: - name: clean disk if: steps.docs.outputs.changed_only == 'no' run: | - sudo swapoff -a - sudo rm -f /swapfile + sudo swapoff /swapfile + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc sudo apt clean docker rmi $(docker images -q) -f df -h diff --git a/.github/workflows/ci-unit-broker.yml b/.github/workflows/ci-unit-broker.yml index a9841efe24355..e555d0f805165 100644 --- a/.github/workflows/ci-unit-broker.yml +++ b/.github/workflows/ci-unit-broker.yml @@ -49,21 +49,20 @@ jobs: with: java-version: 1.8 + - name: Set up Maven + uses: apache/pulsar-test-infra/setup-maven@master + if: steps.docs.outputs.changed_only == 'no' + with: + maven-version: 3.6.1 + - name: clean disk if: steps.docs.outputs.changed_only == 'no' run: | - sudo swapoff -a - sudo rm -f /swapfile + sudo swapoff /swapfile + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc sudo apt clean docker rmi $(docker images -q) -f df -h - free -h - - - name: Set up Maven - uses: apache/pulsar-test-infra/setup-maven@master - if: steps.docs.outputs.changed_only == 'no' - with: - maven-version: 3.6.1 - name: run unit tests install by skip tests if: steps.docs.outputs.changed_only == 'no' diff --git a/.github/workflows/ci-unit.yaml b/.github/workflows/ci-unit.yaml index 6ce8c60c1d329..b8d21c4e93177 100644 --- a/.github/workflows/ci-unit.yaml +++ b/.github/workflows/ci-unit.yaml @@ -55,6 +55,15 @@ jobs: with: maven-version: 3.6.1 + - name: clean disk + if: steps.docs.outputs.changed_only == 'no' + run: | + sudo swapoff /swapfile + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc + sudo apt clean + docker rmi $(docker images -q) -f + df -h + - name: run unit tests install by skip tests if: steps.docs.outputs.changed_only == 'no' run: mvn clean install -DskipTests diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java index e04cf49066eaa..e588a07919020 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java @@ -42,6 +42,13 @@ static SchemaDefinitionBuilder builder() { */ boolean getAlwaysAllowNull(); + /** + * Get JSR310 conversion enabled. + * + * @return return true if enable JSR310 conversion. false means use Joda time conversion. + */ + boolean isJsr310ConversionEnabled(); + /** * Get schema class. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java index 7fd0f9de43b6d..16f779beba1fc 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java @@ -35,6 +35,22 @@ public interface SchemaDefinitionBuilder { */ SchemaDefinitionBuilder withAlwaysAllowNull(boolean alwaysAllowNull); + /** + * Set schema use JRS310 conversion or not. + * + *

Before Avro 1.9 the Joda time library was used for handling the logical date(time) values. + * But since the introduction of Java8 the Java Specification Request (JSR) 310 has been included, + * which greatly improves the handling of date and time natively. To keep forwarding compatibility, + * default is use Joda time conversion. + * + *

JSR310 conversion is recommended here. Joda time conversion is has been marked deprecated. + * In future versions, joda time conversion may be removed + * + * @param jsr310ConversionEnabled use JRS310 conversion or not, default is false for keep forwarding compatibility + * @return schema definition builder + */ + SchemaDefinitionBuilder withJSR310ConversionEnabled(boolean jsr310ConversionEnabled); + /** * Set schema info properties. * diff --git a/pulsar-client-messagecrypto-bc/pom.xml b/pulsar-client-messagecrypto-bc/pom.xml index 73e1f3148aeb8..0f7b190931e01 100644 --- a/pulsar-client-messagecrypto-bc/pom.xml +++ b/pulsar-client-messagecrypto-bc/pom.xml @@ -46,6 +46,5 @@ bouncy-castle-bc-shaded ${project.parent.version} - diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index 0981f4663b9ff..7bd751a4aea6f 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -129,6 +129,12 @@ + + joda-time + joda-time + provided + + com.google.protobuf protobuf-java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java index 4049f14b78fa0..3e35cb21b287f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.Conversions; +import org.apache.avro.data.JodaTimeConversions; import org.apache.avro.data.TimeConversions; import org.apache.avro.reflect.ReflectData; import org.apache.pulsar.client.api.Schema; @@ -42,36 +43,14 @@ public class AvroSchema extends StructSchema { private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class); - // the aim to fix avro's bug - // https://issues.apache.org/jira/browse/AVRO-1891 bug address explain - // fix the avro logical type read and write - static { - ReflectData reflectDataAllowNull = ReflectData.AllowNull.get(); - - reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion()); - reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion()); - reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); - reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); - reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); - reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); - - ReflectData reflectDataNotAllowNull = ReflectData.get(); - - reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion()); - reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion()); - reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); - reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); - reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); - reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); - } - private ClassLoader pojoClassLoader; private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) { super(schemaInfo); this.pojoClassLoader = pojoClassLoader; - setReader(new AvroReader<>(schema, pojoClassLoader)); - setWriter(new AvroWriter<>(schema)); + boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo); + setReader(new AvroReader<>(schema, pojoClassLoader, jsr310ConversionEnabled)); + setWriter(new AvroWriter<>(schema, jsr310ConversionEnabled)); } @Override @@ -116,7 +95,8 @@ protected SchemaReader loadReader(BytesSchemaVersion schemaVersion) { log.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}", SchemaUtils.getStringSchemaVersion(schemaVersion.get()), schemaInfo.getSchemaDefinition(), schemaInfo.toString()); - return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader); + boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo); + return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader, jsr310ConversionEnabled); } else { log.warn("No schema found for version({}), use latest schema : {}", SchemaUtils.getStringSchemaVersion(schemaVersion.get()), @@ -125,4 +105,30 @@ protected SchemaReader loadReader(BytesSchemaVersion schemaVersion) { } } + private static boolean getJsr310ConversionEnabledFromSchemaInfo(SchemaInfo schemaInfo) { + if (schemaInfo != null) { + return Boolean.parseBoolean(schemaInfo.getProperties() + .getOrDefault(SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED, "false")); + } + return false; + } + + public static void addLogicalTypeConversions(ReflectData reflectData, boolean jsr310ConversionEnabled) { + reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion()); + reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); + reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); + reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + if (jsr310ConversionEnabled) { + reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); + } else { + try { + Class.forName("org.joda.time.DateTime"); + reflectData.addLogicalTypeConversion(new JodaTimeConversions.TimestampConversion()); + } catch (ClassNotFoundException e) { + // Skip if have not provide joda-time dependency. + } + } + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java index 2a0cab72efb05..f5f52e855939d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java @@ -33,11 +33,13 @@ public class SchemaDefinitionBuilderImpl implements SchemaDefinitionBuilder { public static final String ALWAYS_ALLOW_NULL = "__alwaysAllowNull"; + public static final String JSR310_CONVERSION_ENABLED = "__jsr310ConversionEnabled"; /** * the schema definition class */ private Class clazz; + /** * The flag of schema type always allow null * @@ -48,6 +50,13 @@ public class SchemaDefinitionBuilderImpl implements SchemaDefinitionBuilder withAlwaysAllowNull(boolean alwaysAllowNull) { return this; } + @Override + public SchemaDefinitionBuilder withJSR310ConversionEnabled(boolean jsr310ConversionEnabled) { + this.jsr310ConversionEnabled = jsr310ConversionEnabled; + return this; + } + @Override public SchemaDefinitionBuilder addProperty(String key, String value) { this.properties.put(key, value); @@ -107,8 +122,10 @@ public SchemaDefinition build() { checkArgument(!(StringUtils.isNotBlank(jsonDef) && clazz != null), "Not allowed to set pojo and jsonDef both for the schema definition."); - properties.put(ALWAYS_ALLOW_NULL, this.alwaysAllowNull ? "true" : "false"); - return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning); + properties.put(ALWAYS_ALLOW_NULL, String.valueOf(this.alwaysAllowNull)); + properties.put(JSR310_CONVERSION_ENABLED, String.valueOf(this.jsr310ConversionEnabled)); + return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning, + jsr310ConversionEnabled); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java index 8ace2469212a1..7ff0ebac524e2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java @@ -42,20 +42,24 @@ public class SchemaDefinitionImpl implements SchemaDefinition{ * false you can define the field by yourself by the annotation@Nullable * */ - private boolean alwaysAllowNull; + private final boolean alwaysAllowNull; - private Map properties; + private final Map properties; - private String jsonDef; + private final String jsonDef; - private boolean supportSchemaVersioning; + private final boolean supportSchemaVersioning; - public SchemaDefinitionImpl(Class pojo, String jsonDef, boolean alwaysAllowNull, Map properties, boolean supportSchemaVersioning) { + private final boolean jsr310ConversionEnabled; + + public SchemaDefinitionImpl(Class pojo, String jsonDef, boolean alwaysAllowNull, Map properties, + boolean supportSchemaVersioning, boolean jsr310ConversionEnabled) { this.alwaysAllowNull = alwaysAllowNull; this.properties = properties; this.jsonDef = jsonDef; this.pojo = pojo; this.supportSchemaVersioning = supportSchemaVersioning; + this.jsr310ConversionEnabled = jsr310ConversionEnabled; } /** * get schema whether always allow null or not @@ -66,6 +70,11 @@ public boolean getAlwaysAllowNull() { return alwaysAllowNull; } + @Override + public boolean isJsr310ConversionEnabled() { + return jsr310ConversionEnabled; + } + /** * Get json schema definition * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java index 8481542b17fad..e1b58a42e720b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java @@ -18,9 +18,7 @@ */ package org.apache.pulsar.client.impl.schema.reader; -import org.apache.avro.Conversions; import org.apache.avro.Schema; -import org.apache.avro.data.TimeConversions; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.reflect.ReflectData; @@ -28,6 +26,7 @@ import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaReader; +import org.apache.pulsar.client.impl.schema.AvroSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,30 +43,21 @@ public AvroReader(Schema schema) { this.reader = new ReflectDatumReader<>(schema); } - public AvroReader(Schema schema, ClassLoader classLoader) { + public AvroReader(Schema schema, ClassLoader classLoader, boolean jsr310ConversionEnabled) { if (classLoader != null) { ReflectData reflectData = new ReflectData(classLoader); - reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion()); - reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion()); - reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); - reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); - reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); - reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled); this.reader = new ReflectDatumReader<>(schema, schema, reflectData); } else { this.reader = new ReflectDatumReader<>(schema); } } - public AvroReader(Schema writerSchema, Schema readerSchema, ClassLoader classLoader) { + public AvroReader(Schema writerSchema, Schema readerSchema, ClassLoader classLoader, + boolean jsr310ConversionEnabled) { if (classLoader != null) { ReflectData reflectData = new ReflectData(classLoader); - reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion()); - reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion()); - reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); - reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); - reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); - reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled); this.reader = new ReflectDatumReader<>(writerSchema, readerSchema, reflectData); } else { this.reader = new ReflectDatumReader<>(writerSchema, readerSchema); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java index 912bca72d2658..41ea50fad8c15 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java @@ -21,9 +21,11 @@ import org.apache.avro.Schema; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaWriter; +import org.apache.pulsar.client.impl.schema.AvroSchema; import java.io.ByteArrayOutputStream; @@ -33,9 +35,15 @@ public class AvroWriter implements SchemaWriter { private ByteArrayOutputStream byteArrayOutputStream; public AvroWriter(Schema schema) { + this(schema, false); + } + + public AvroWriter(Schema schema, boolean jsr310ConversionEnabled) { this.byteArrayOutputStream = new ByteArrayOutputStream(); this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder); - this.writer = new ReflectDatumWriter<>(schema); + ReflectData reflectData = new ReflectData(); + AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled); + this.writer = new ReflectDatumWriter<>(schema, reflectData); } @Override diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java index 0aa45aff4356b..49d5da998402e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java @@ -53,6 +53,8 @@ import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; import org.json.JSONException; import org.skyscreamer.jsonassert.JSONAssert; import org.testng.Assert; @@ -99,6 +101,20 @@ private static class SchemaLogicalType{ long timeMicros; } + @Data + private static class JodaTimeLogicalType{ + @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"date\"}") + LocalDate date; + @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}") + DateTime timestampMillis; + @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"time-millis\"}") + LocalTime timeMillis; + @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}") + long timestampMicros; + @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"time-micros\"}") + long timeMicros; + } + @Test public void testSchemaDefinition() throws SchemaValidationException { org.apache.avro.Schema schema1 = ReflectData.get().getSchema(DefaultStruct.class); @@ -251,7 +267,8 @@ public void testAllowNullEncodeAndDecode() { @Test public void testLogicalType() { - AvroSchema avroSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(SchemaLogicalType.class).build()); + AvroSchema avroSchema = AvroSchema.of(SchemaDefinition.builder() + .withPojo(SchemaLogicalType.class).withJSR310ConversionEnabled(true).build()); SchemaLogicalType schemaLogicalType = new SchemaLogicalType(); schemaLogicalType.setTimestampMicros(System.currentTimeMillis()*1000); @@ -270,6 +287,25 @@ public void testLogicalType() { } + @Test + public void testJodaTimeLogicalType() { + AvroSchema avroSchema = AvroSchema.of(SchemaDefinition.builder() + .withPojo(JodaTimeLogicalType.class).build()); + JodaTimeLogicalType schemaLogicalType = new JodaTimeLogicalType(); + schemaLogicalType.setTimestampMicros(System.currentTimeMillis()*1000); + schemaLogicalType.setTimestampMillis(new DateTime("2019-03-26T04:39:58.469Z", ISOChronology.getInstanceUTC())); + schemaLogicalType.setDate(LocalDate.now()); + schemaLogicalType.setTimeMicros(System.currentTimeMillis()*1000); + schemaLogicalType.setTimeMillis(LocalTime.now()); + + byte[] bytes1 = avroSchema.encode(schemaLogicalType); + Assert.assertTrue(bytes1.length > 0); + + JodaTimeLogicalType object1 = avroSchema.decode(bytes1); + + assertEquals(object1, schemaLogicalType); + } + @Test public void testDateAndTimestamp() { RecordSchemaBuilder recordSchemaBuilder = diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java index abcfaf2ce16e6..4c353565202af 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java @@ -48,7 +48,7 @@ public class ProtobufSchemaTest { "\"type\":\"double\",\"default\":0}]}],\"default\":null},{\"name\":\"repeatedField\"," + "\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}}]}"; - private static final String EXPECTED_PARSING_INFO = "{\"__alwaysAllowNull\":\"true\",\"__PARSING_INFO__\":" + + private static final String EXPECTED_PARSING_INFO = "{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\",\"__PARSING_INFO__\":" + "\"[{\\\"number\\\":1,\\\"name\\\":\\\"stringField\\\",\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"" + "LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":2,\\\"name\\\":\\\"doubleField\\\",\\\"type\\\"" + ":\\\"DOUBLE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":6,\\\"name\\\"" + diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java index 7678be46e2135..cb65e42c416b2 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java @@ -61,6 +61,7 @@ public class SchemaInfoTest { + " \"type\": \"JSON\",\n" + " \"properties\": {\n" + " \"__alwaysAllowNull\": \"true\",\n" + + " \"__jsr310ConversionEnabled\": \"false\",\n" + " \"bar1\": \"bar-value1\",\n" + " \"bar2\": \"bar-value2\",\n" + " \"bar3\": \"bar-value3\"\n" @@ -132,6 +133,7 @@ public class SchemaInfoTest { + " \"type\": \"AVRO\",\n" + " \"properties\": {\n" + " \"__alwaysAllowNull\": \"false\",\n" + + " \"__jsr310ConversionEnabled\": \"false\",\n" + " \"foo1\": \"foo-value1\",\n" + " \"foo2\": \"foo-value2\",\n" + " \"foo3\": \"foo-value3\"\n" @@ -206,6 +208,7 @@ public class SchemaInfoTest { + " \"type\": \"AVRO\",\n" + " \"properties\": {\n" + " \"__alwaysAllowNull\": \"false\",\n" + + " \"__jsr310ConversionEnabled\": \"false\",\n" + " \"foo1\": \"foo-value1\",\n" + " \"foo2\": \"foo-value2\",\n" + " \"foo3\": \"foo-value3\"\n" @@ -227,6 +230,7 @@ public class SchemaInfoTest { + " \"type\": \"JSON\",\n" + " \"properties\": {\n" + " \"__alwaysAllowNull\": \"true\",\n" + + " \"__jsr310ConversionEnabled\": \"false\",\n" + " \"bar1\": \"bar-value1\",\n" + " \"bar2\": \"bar-value2\",\n" + " \"bar3\": \"bar-value3\"\n" @@ -236,11 +240,11 @@ public class SchemaInfoTest { + " \"type\": \"KEY_VALUE\",\n" + " \"properties\": {\n" + " \"key.schema.name\": \"\",\n" - + " \"key.schema.properties\": \"{\\\"__alwaysAllowNull\\\":\\\"false\\\",\\\"foo1\\\":\\\"foo-value1\\\",\\\"foo2\\\":\\\"foo-value2\\\",\\\"foo3\\\":\\\"foo-value3\\\"}\",\n" + + " \"key.schema.properties\": \"{\\\"__alwaysAllowNull\\\":\\\"false\\\",\\\"__jsr310ConversionEnabled\\\":\\\"false\\\",\\\"foo1\\\":\\\"foo-value1\\\",\\\"foo2\\\":\\\"foo-value2\\\",\\\"foo3\\\":\\\"foo-value3\\\"}\",\n" + " \"key.schema.type\": \"AVRO\",\n" + " \"kv.encoding.type\": \"SEPARATED\",\n" + " \"value.schema.name\": \"\",\n" - + " \"value.schema.properties\": \"{\\\"__alwaysAllowNull\\\":\\\"true\\\",\\\"bar1\\\":\\\"bar-value1\\\",\\\"bar2\\\":\\\"bar-value2\\\",\\\"bar3\\\":\\\"bar-value3\\\"}\",\n" + + " \"value.schema.properties\": \"{\\\"__alwaysAllowNull\\\":\\\"true\\\",\\\"__jsr310ConversionEnabled\\\":\\\"false\\\",\\\"bar1\\\":\\\"bar-value1\\\",\\\"bar2\\\":\\\"bar-value2\\\",\\\"bar3\\\":\\\"bar-value3\\\"}\",\n" + " \"value.schema.type\": \"JSON\"\n" + " }\n" + "}"; diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index 4a3016e42e888..31518240cd6a5 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -132,6 +132,12 @@ ${rabbitmq-client.version} + + joda-time + joda-time + test + + com.facebook.presto presto-jdbc diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/JodaTimeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/JodaTimeTest.java new file mode 100644 index 0000000000000..49a3f8aaa06dc --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/JodaTimeTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.schema; + +import com.google.common.collect.Sets; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalTime; + +import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; +import static org.testng.Assert.assertEquals; + +@Slf4j +public class JodaTimeTest extends PulsarTestSuite { + + private PulsarClient client; + private PulsarAdmin admin; + + @BeforeMethod + public void setup() throws Exception { + this.client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + this.admin = PulsarAdmin.builder() + .serviceHttpUrl(pulsarCluster.getHttpServiceUrl()) + .build(); + } + + @Data + private static class JodaSchema { + + @org.apache.avro.reflect.AvroSchema("{\n" + + " \"type\": \"bytes\",\n" + + " \"logicalType\": \"decimal\",\n" + + " \"precision\": 4,\n" + + " \"scale\": 2\n" + + "}") + BigDecimal decimal; + @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"date\"}") + LocalDate date; + @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}") + DateTime timestampMillis; + @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"time-millis\"}") + LocalTime timeMillis; + @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}") + long timestampMicros; + @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"time-micros\"}") + long timeMicros; + } + + @Test + public void testJodaTime() throws PulsarAdminException, PulsarClientException { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topic = "test-joda-time-schema"; + final String fqtn = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topic + ).toString(); + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(pulsarCluster.getClusterName()) + ); + + JodaSchema forSend = new JodaSchema(); + forSend.setDecimal(new BigDecimal("12.34")); + forSend.setTimeMicros(System.currentTimeMillis() * 1000); + forSend.setTimestampMillis(new DateTime("2019-03-26T04:39:58.469Z", ISOChronology.getInstanceUTC())); + forSend.setTimeMillis(LocalTime.now()); + forSend.setTimeMicros(System.currentTimeMillis() * 1000); + forSend.setDate(LocalDate.now()); + + Producer producer = client + .newProducer(Schema.AVRO(JodaSchema.class)) + .topic(fqtn) + .create(); + + Consumer consumer = client + .newConsumer(Schema.AVRO(JodaSchema.class)) + .topic(fqtn) + .subscriptionName("test") + .subscribe(); + + producer.send(forSend); + JodaSchema received = consumer.receive().getValue(); + assertEquals(received, forSend); + + producer.close(); + consumer.close(); + + log.info("Successfully Joda time logical type message : {}", received); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java index 70a469290d848..4b0083a76094c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java @@ -193,7 +193,8 @@ public void testAvroLogicalType() throws Exception { .build(); Producer producer = client - .newProducer(Schema.AVRO(AvroLogicalType.class)) + .newProducer(Schema.AVRO(SchemaDefinition.builder().withPojo(AvroLogicalType.class) + .withJSR310ConversionEnabled(true).build())) .topic(fqtn) .create(); @@ -207,12 +208,7 @@ public void testAvroLogicalType() throws Exception { log.info("Successfully published avro logical type message : {}", messageForSend); AvroLogicalType received = consumer.receive().getValue(); - assertEquals(messageForSend.getDecimal(), received.getDecimal()); - assertEquals(messageForSend.getTimeMicros(), received.getTimeMicros()); - assertEquals(messageForSend.getTimeMillis(), received.getTimeMillis()); - assertEquals(messageForSend.getTimestampMicros(), received.getTimestampMicros()); - assertEquals(messageForSend.getTimestampMillis(), received.getTimestampMillis()); - assertEquals(messageForSend.getDate(), received.getDate()); + assertEquals(received, messageForSend); producer.close(); consumer.close(); diff --git a/tests/integration/src/test/resources/pulsar-schema.xml b/tests/integration/src/test/resources/pulsar-schema.xml index 7b3b21ef0dea6..59178052b7054 100644 --- a/tests/integration/src/test/resources/pulsar-schema.xml +++ b/tests/integration/src/test/resources/pulsar-schema.xml @@ -23,6 +23,7 @@ +