Skip to content

Commit 2aff473

Browse files
authored
[issue #6765] Expose definition flags to function (#6868)
1) Change the value of the CLI tool parameter "--custom-schema-input" from "TopicName-> schemaType" to "topicName-> {" schemaType ":" type"," jsr310ConversionEnabled ": true," alwaysAllowNull ": true}" 2) Modify Function.proto, add properties "jsr310ConversionEnabled", "alwaysAllowNull"。So that we can receive the above 2 parameters 3) Modify the "FunctionConfigUtils#convert" method , put the two parameters in "CustomSchemaInputs" into ConsumerSpec 4) In “JavaInstanceRunnable#setupInput” method, put the above 2 parameters into "ConsumerConfig" and pass it to "PulsarSource", let it set the parameters into "schema" when creating the consumer of Source。 So that,The “function” can get these 2 parameters from the message of ”currentRecord“
1 parent dbc0649 commit 2aff473

File tree

14 files changed

+248
-23
lines changed

14 files changed

+248
-23
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
import org.apache.pulsar.client.api.Producer;
6060
import org.apache.pulsar.client.api.PulsarClient;
6161
import org.apache.pulsar.client.api.Schema;
62+
import org.apache.pulsar.client.api.schema.GenericRecord;
63+
import org.apache.pulsar.client.api.schema.SchemaDefinition;
6264
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
6365
import org.apache.pulsar.common.functions.ConsumerConfig;
6466
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -72,6 +74,7 @@
7274
import org.apache.pulsar.common.util.FutureUtil;
7375
import org.apache.pulsar.common.util.ObjectMapperFactory;
7476
import org.apache.pulsar.functions.LocalRunner;
77+
import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
7578
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
7679
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
7780
import org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink;
@@ -479,11 +482,134 @@ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Excepti
479482
}
480483
}
481484

485+
public void testAvroFunctionLocalRun(String jarFilePathUrl) throws Exception {
486+
487+
final String namespacePortion = "io";
488+
final String replNamespace = tenant + "/" + namespacePortion;
489+
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
490+
final String sinkTopic = "persistent://" + replNamespace + "/output";
491+
final String propertyKey = "key";
492+
final String propertyValue = "value";
493+
final String functionName = "PulsarFunction-test";
494+
final String subscriptionName = "test-sub";
495+
admin.namespaces().createNamespace(replNamespace);
496+
Set<String> clusters = Sets.newHashSet(Lists.newArrayList(CLUSTER));
497+
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
498+
499+
500+
Schema schema = Schema.AVRO(SchemaDefinition.builder()
501+
.withAlwaysAllowNull(true)
502+
.withJSR310ConversionEnabled(true)
503+
.withPojo(AvroTestObject.class).build());
504+
//use AVRO schema
505+
admin.schemas().createSchema(sourceTopic, schema.getSchemaInfo());
506+
507+
//produce message to sourceTopic
508+
Producer<AvroTestObject> producer = pulsarClient.newProducer(schema).topic(sourceTopic).create();
509+
//consume message from sinkTopic
510+
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(sinkTopic).subscriptionName("sub").subscribe();
511+
512+
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
513+
sourceTopic, sinkTopic, subscriptionName);
514+
//set jsr310ConversionEnabled、alwaysAllowNull
515+
Map<String,String> schemaInput = new HashMap<>();
516+
schemaInput.put(sourceTopic, "{\"schemaType\":\"AVRO\",\"schemaProperties\":{\"__jsr310ConversionEnabled\":\"true\",\"__alwaysAllowNull\":\"true\"}}");
517+
Map<String, String> schemaOutput = new HashMap<>();
518+
schemaOutput.put(sinkTopic, "{\"schemaType\":\"AVRO\",\"schemaProperties\":{\"__jsr310ConversionEnabled\":\"true\",\"__alwaysAllowNull\":\"true\"}}");
519+
520+
functionConfig.setCustomSchemaInputs(schemaInput);
521+
functionConfig.setCustomSchemaOutputs(schemaOutput);
522+
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
523+
if (jarFilePathUrl == null) {
524+
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction");
525+
} else {
526+
functionConfig.setJar(jarFilePathUrl);
527+
}
528+
529+
LocalRunner localRunner = LocalRunner.builder()
530+
.functionConfig(functionConfig)
531+
.clientAuthPlugin(AuthenticationTls.class.getName())
532+
.clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
533+
.useTls(true)
534+
.tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
535+
.tlsAllowInsecureConnection(true)
536+
.tlsHostNameVerificationEnabled(false)
537+
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
538+
localRunner.start(false);
539+
540+
retryStrategically((test) -> {
541+
try {
542+
TopicStats stats = admin.topics().getStats(sourceTopic);
543+
return stats.subscriptions.get(subscriptionName) != null
544+
&& !stats.subscriptions.get(subscriptionName).consumers.isEmpty();
545+
} catch (PulsarAdminException e) {
546+
return false;
547+
}
548+
}, 50, 150);
549+
550+
int totalMsgs = 5;
551+
for (int i = 0; i < totalMsgs; i++) {
552+
AvroTestObject avroTestObject = new AvroTestObject();
553+
avroTestObject.setBaseValue(i);
554+
producer.newMessage().property(propertyKey, propertyValue)
555+
.value(avroTestObject).send();
556+
}
557+
558+
//consume message from sinkTopic
559+
for (int i = 0; i < totalMsgs; i++) {
560+
Message<GenericRecord> msg = consumer.receive(5, TimeUnit.SECONDS);
561+
String receivedPropertyValue = msg.getProperty(propertyKey);
562+
assertEquals(propertyValue, receivedPropertyValue);
563+
assertEquals(msg.getValue().getField("baseValue"), 10 + i);
564+
consumer.acknowledge(msg);
565+
}
566+
567+
// validate pulsar-sink consumer has consumed all messages
568+
assertNotEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().unackedMessages, 0);
569+
localRunner.stop();
570+
571+
retryStrategically((test) -> {
572+
try {
573+
TopicStats topicStats = admin.topics().getStats(sourceTopic);
574+
return topicStats.subscriptions.get(subscriptionName) != null
575+
&& topicStats.subscriptions.get(subscriptionName).consumers.isEmpty();
576+
} catch (PulsarAdminException e) {
577+
return false;
578+
}
579+
}, 20, 150);
580+
581+
//change the schema ,the function should not run, resulting in no messages to consume
582+
schemaInput.put(sourceTopic, "{\"schemaType\":\"AVRO\",\"schemaProperties\":{\"__jsr310ConversionEnabled\":\"false\",\"__alwaysAllowNull\":\"false\"}}");
583+
localRunner = LocalRunner.builder()
584+
.functionConfig(functionConfig)
585+
.clientAuthPlugin(AuthenticationTls.class.getName())
586+
.clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
587+
.useTls(true)
588+
.tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
589+
.tlsAllowInsecureConnection(true)
590+
.tlsHostNameVerificationEnabled(false)
591+
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
592+
localRunner.start(false);
593+
594+
producer.newMessage().property(propertyKey, propertyValue).value(new AvroTestObject()).send();
595+
Message<GenericRecord> msg = consumer.receive(2, TimeUnit.SECONDS);
596+
assertEquals(msg, null);
597+
598+
producer.close();
599+
consumer.close();
600+
localRunner.stop();
601+
}
602+
482603
@Test(timeOut = 20000)
483604
public void testE2EPulsarFunctionLocalRun() throws Exception {
484605
testE2EPulsarFunctionLocalRun(null);
485606
}
486607

608+
@Test(timeOut = 30000)
609+
public void testAvroFunctionLocalRun() throws Exception {
610+
testAvroFunctionLocalRun(null);
611+
}
612+
487613
@Test(timeOut = 20000)
488614
public void testE2EPulsarFunctionLocalRunWithJar() throws Exception {
489615
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,10 @@ abstract class FunctionDetailsCommand extends BaseCommand {
231231
protected String DEPRECATED_customSerdeInputString;
232232
@Parameter(names = "--custom-serde-inputs", description = "The map of input topics to SerDe class names (as a JSON string)")
233233
protected String customSerdeInputString;
234-
@Parameter(names = "--custom-schema-inputs", description = "The map of input topics to Schema class names (as a JSON string)")
234+
@Parameter(names = "--custom-schema-inputs", description = "The map of input topics to Schema properties (as a JSON string)")
235235
protected String customSchemaInputString;
236+
@Parameter(names = "--custom-schema-outputs", description = "The map of input topics to Schema properties (as a JSON string)")
237+
protected String customSchemaOutputString;
236238
// for backwards compatibility purposes
237239
@Parameter(names = "--outputSerdeClassName", description = "The SerDe class to be used for messages output by the function", hidden = true)
238240
protected String DEPRECATED_outputSerdeClassName;
@@ -368,6 +370,11 @@ void processArguments() throws Exception {
368370
Map<String, String> customschemaInputMap = new Gson().fromJson(customSchemaInputString, type);
369371
functionConfig.setCustomSchemaInputs(customschemaInputMap);
370372
}
373+
if (null != customSchemaOutputString) {
374+
Type type = new TypeToken<Map<String, String>>() {}.getType();
375+
Map<String, String> customSchemaOutputMap = new Gson().fromJson(customSchemaOutputString, type);
376+
functionConfig.setCustomSchemaOutputs(customSchemaOutputMap);
377+
}
371378
if (null != topicsPattern) {
372379
functionConfig.setTopicsPattern(topicsPattern);
373380
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ public SchemaDefinitionBuilder<T> withSupportSchemaVersioning(boolean supportSch
117117
@Override
118118
public SchemaDefinitionBuilder<T> withProperties(Map<String,String> properties) {
119119
this.properties = properties;
120+
if (properties.containsKey(ALWAYS_ALLOW_NULL)) {
121+
alwaysAllowNull = Boolean.parseBoolean(properties.get(ALWAYS_ALLOW_NULL));
122+
}
123+
if (properties.containsKey(ALWAYS_ALLOW_NULL)) {
124+
jsr310ConversionEnabled = Boolean.parseBoolean(properties.get(JSR310_CONVERSION_ENABLED));
125+
}
120126
return this;
121127
}
122128

pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
*/
1919
package org.apache.pulsar.common.functions;
2020

21+
import java.util.HashMap;
22+
import java.util.Map;
23+
2124
import lombok.AllArgsConstructor;
2225
import lombok.Builder;
2326
import lombok.Data;
2427
import lombok.EqualsAndHashCode;
2528
import lombok.NoArgsConstructor;
2629

30+
2731
/**
2832
* Configuration of a consumer.
2933
*/
@@ -36,5 +40,12 @@ public class ConsumerConfig {
3640
private String schemaType;
3741
private String serdeClassName;
3842
private boolean isRegexPattern;
43+
@Builder.Default
44+
private Map<String, String> schemaProperties = new HashMap<>();
3945
private Integer receiverQueueSize;
46+
47+
public ConsumerConfig(String schemaType) {
48+
this.schemaType = schemaType;
49+
}
50+
4051
}

pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public enum Runtime {
6868
private Map<String, String> customSerdeInputs;
6969
private String topicsPattern;
7070
private Map<String, String> customSchemaInputs;
71+
private Map<String, String> customSchemaOutputs;
7172

7273
/**
7374
* A generalized way of specifying inputs.

pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class SinkConfig {
5454

5555
private Map<String, String> topicToSchemaType;
5656

57+
private Map<String, String> topicToSchemaProperties;
58+
5759
private Map<String, ConsumerConfig> inputSpecs;
5860

5961
private Integer maxMessageRetries;

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
682682
} else if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
683683
consumerConfig.setSerdeClassName(conf.getSerdeClassName());
684684
}
685+
consumerConfig.setSchemaProperties(conf.getSchemaPropertiesMap());
685686
if (conf.hasReceiverQueueSize()) {
686687
consumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize().getValue());
687688
}
@@ -796,6 +797,7 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
796797
}
797798

798799
pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
800+
pulsarSinkConfig.setSchemaProperties(sinkSpec.getSchemaPropertiesMap());
799801

800802
object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader);
801803
}

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.pulsar.client.api.Schema;
3333
import org.apache.pulsar.client.api.TypedMessageBuilder;
3434
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
35+
import org.apache.pulsar.common.functions.ConsumerConfig;
3536
import org.apache.pulsar.common.functions.FunctionConfig;
3637
import org.apache.pulsar.common.schema.KeyValueEncodingType;
3738
import org.apache.pulsar.functions.api.Record;
@@ -339,13 +340,16 @@ Schema<T> initializeSchema() throws ClassNotFoundException {
339340
// return type is 'void', so there's no schema to check
340341
return null;
341342
}
342-
343+
ConsumerConfig consumerConfig = new ConsumerConfig();
344+
consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties());
343345
if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
346+
consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
344347
return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
345-
pulsarSinkConfig.getSchemaType(), false);
348+
consumerConfig, false);
346349
} else {
350+
consumerConfig.setSchemaType(pulsarSinkConfig.getSerdeClassName());
347351
return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
348-
pulsarSinkConfig.getSerdeClassName(), false, functionClassLoader);
352+
consumerConfig, false, functionClassLoader);
349353
}
350354
}
351355
}

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import lombok.ToString;
2424
import org.apache.pulsar.common.functions.FunctionConfig;
2525

26+
import java.util.Map;
27+
2628
@Getter
2729
@Setter
2830
@ToString
@@ -31,6 +33,7 @@ public class PulsarSinkConfig {
3133
private String topic;
3234
private String serdeClassName;
3335
private String schemaType;
36+
private Map<String, String> schemaProperties;
3437
private String typeClassName;
3538
private boolean forwardSourceMessageProperty;
3639
}

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ Map<String, ConsumerConfig<T>> setupConsumerConfigs() throws ClassNotFoundExcept
166166
if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
167167
schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName(), true);
168168
} else {
169-
schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType(), true);
169+
schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf, true);
170170
}
171171
configs.put(topic,
172172
ConsumerConfig.<T> builder().schema(schema).isRegexPattern(conf.isRegexPattern()).receiverQueueSize(conf.getReceiverQueueSize()).build());

0 commit comments

Comments
 (0)