Skip to content

Add Joda time logical type conversion. #6704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci-integration-process.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ jobs:
- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci-integration-thread.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ jobs:
- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
17 changes: 8 additions & 9 deletions .github/workflows/ci-unit-broker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,20 @@ jobs:
with:
java-version: 1.8

- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
if: steps.docs.outputs.changed_only == 'no'
with:
maven-version: 3.6.1

- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
free -h

- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
if: steps.docs.outputs.changed_only == 'no'
with:
maven-version: 3.6.1

- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/ci-unit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ jobs:
with:
maven-version: 3.6.1

- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h

- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ static <T> SchemaDefinitionBuilder<T> builder() {
*/
boolean getAlwaysAllowNull();

/**
* Get JSR310 conversion enabled.
*
* @return return true if enable JSR310 conversion. false means use Joda time conversion.
*/
boolean isJsr310ConversionEnabled();

/**
* Get schema class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ public interface SchemaDefinitionBuilder<T> {
*/
SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);

/**
* Set schema use JRS310 conversion or not.
*
* <p>Before Avro 1.9 the Joda time library was used for handling the logical date(time) values.
* But since the introduction of Java8 the Java Specification Request (JSR) 310 has been included,
* which greatly improves the handling of date and time natively. To keep forwarding compatibility,
* default is use Joda time conversion.
*
* <p>JSR310 conversion is recommended here. Joda time conversion is has been marked deprecated.
* In future versions, joda time conversion may be removed
*
* @param jsr310ConversionEnabled use JRS310 conversion or not, default is false for keep forwarding compatibility
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled);

/**
* Set schema info properties.
*
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-messagecrypto-bc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,5 @@
<artifactId>bouncy-castle-bc-shaded</artifactId>
<version>${project.parent.version}</version>
</dependency>

</dependencies>
</project>
6 changes: 6 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Conversions;
import org.apache.avro.data.JodaTimeConversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -42,36 +43,14 @@
public class AvroSchema<T> extends StructSchema<T> {
private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);

// the aim to fix avro's bug
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
// fix the avro logical type read and write
static {
ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();

reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());

ReflectData reflectDataNotAllowNull = ReflectData.get();

reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
}

private ClassLoader pojoClassLoader;

private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
super(schemaInfo);
this.pojoClassLoader = pojoClassLoader;
setReader(new AvroReader<>(schema, pojoClassLoader));
setWriter(new AvroWriter<>(schema));
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
setReader(new AvroReader<>(schema, pojoClassLoader, jsr310ConversionEnabled));
setWriter(new AvroWriter<>(schema, jsr310ConversionEnabled));
}

@Override
Expand Down Expand Up @@ -116,7 +95,8 @@ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
log.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo.getSchemaDefinition(), schemaInfo.toString());
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader);
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader, jsr310ConversionEnabled);
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
Expand All @@ -125,4 +105,30 @@ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
}
}

private static boolean getJsr310ConversionEnabledFromSchemaInfo(SchemaInfo schemaInfo) {
if (schemaInfo != null) {
return Boolean.parseBoolean(schemaInfo.getProperties()
.getOrDefault(SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED, "false"));
}
return false;
}

public static void addLogicalTypeConversions(ReflectData reflectData, boolean jsr310ConversionEnabled) {
reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
if (jsr310ConversionEnabled) {
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
} else {
try {
Class.forName("org.joda.time.DateTime");
reflectData.addLogicalTypeConversion(new JodaTimeConversions.TimestampConversion());
} catch (ClassNotFoundException e) {
// Skip if have not provide joda-time dependency.
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T> {

public static final String ALWAYS_ALLOW_NULL = "__alwaysAllowNull";
public static final String JSR310_CONVERSION_ENABLED = "__jsr310ConversionEnabled";

/**
* the schema definition class
*/
private Class<T> clazz;

/**
* The flag of schema type always allow null
*
Expand All @@ -48,6 +50,13 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
*/
private boolean alwaysAllowNull = true;

/**
* The flag of use JSR310 conversion or Joda time conversion.
*
* If value is true, use JSR310 conversion in the Avro schema. Otherwise, use Joda time conversion.
*/
private boolean jsr310ConversionEnabled = false;

/**
* The schema info properties
*/
Expand All @@ -69,6 +78,12 @@ public SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull) {
return this;
}

@Override
public SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled) {
this.jsr310ConversionEnabled = jsr310ConversionEnabled;
return this;
}

@Override
public SchemaDefinitionBuilder<T> addProperty(String key, String value) {
this.properties.put(key, value);
Expand Down Expand Up @@ -107,8 +122,10 @@ public SchemaDefinition<T> build() {
checkArgument(!(StringUtils.isNotBlank(jsonDef) && clazz != null),
"Not allowed to set pojo and jsonDef both for the schema definition.");

properties.put(ALWAYS_ALLOW_NULL, this.alwaysAllowNull ? "true" : "false");
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning);
properties.put(ALWAYS_ALLOW_NULL, String.valueOf(this.alwaysAllowNull));
properties.put(JSR310_CONVERSION_ENABLED, String.valueOf(this.jsr310ConversionEnabled));
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning,
jsr310ConversionEnabled);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,24 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
* false you can define the field by yourself by the annotation@Nullable
*
*/
private boolean alwaysAllowNull;
private final boolean alwaysAllowNull;

private Map<String, String> properties;
private final Map<String, String> properties;

private String jsonDef;
private final String jsonDef;

private boolean supportSchemaVersioning;
private final boolean supportSchemaVersioning;

public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties, boolean supportSchemaVersioning) {
private final boolean jsr310ConversionEnabled;

public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties,
boolean supportSchemaVersioning, boolean jsr310ConversionEnabled) {
this.alwaysAllowNull = alwaysAllowNull;
this.properties = properties;
this.jsonDef = jsonDef;
this.pojo = pojo;
this.supportSchemaVersioning = supportSchemaVersioning;
this.jsr310ConversionEnabled = jsr310ConversionEnabled;
}
/**
* get schema whether always allow null or not
Expand All @@ -66,6 +70,11 @@ public boolean getAlwaysAllowNull() {
return alwaysAllowNull;
}

@Override
public boolean isJsr310ConversionEnabled() {
return jsr310ConversionEnabled;
}

/**
* Get json schema definition
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
*/
package org.apache.pulsar.client.impl.schema.reader;

import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaReader;

import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,30 +43,21 @@ public AvroReader(Schema schema) {
this.reader = new ReflectDatumReader<>(schema);
}

public AvroReader(Schema schema, ClassLoader classLoader) {
public AvroReader(Schema schema, ClassLoader classLoader, boolean jsr310ConversionEnabled) {
if (classLoader != null) {
ReflectData reflectData = new ReflectData(classLoader);
reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled);
this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
} else {
this.reader = new ReflectDatumReader<>(schema);
}
}

public AvroReader(Schema writerSchema, Schema readerSchema, ClassLoader classLoader) {
public AvroReader(Schema writerSchema, Schema readerSchema, ClassLoader classLoader,
boolean jsr310ConversionEnabled) {
if (classLoader != null) {
ReflectData reflectData = new ReflectData(classLoader);
reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled);
this.reader = new ReflectDatumReader<>(writerSchema, readerSchema, reflectData);
} else {
this.reader = new ReflectDatumReader<>(writerSchema, readerSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.AvroSchema;

import java.io.ByteArrayOutputStream;

Expand All @@ -33,9 +35,15 @@ public class AvroWriter<T> implements SchemaWriter<T> {
private ByteArrayOutputStream byteArrayOutputStream;

public AvroWriter(Schema schema) {
this(schema, false);
}

public AvroWriter(Schema schema, boolean jsr310ConversionEnabled) {
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
this.writer = new ReflectDatumWriter<>(schema);
ReflectData reflectData = new ReflectData();
AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled);
this.writer = new ReflectDatumWriter<>(schema, reflectData);
}

@Override
Expand Down
Loading