Pulsar function Should support Generic output Message #18875
pointearth
started this conversation in
General
Replies: 3 comments
-
This is related to: I really would like to see this feature land soon. |
Beta Was this translation helpful? Give feedback.
0 replies
-
The issue had no activity for 30 days, mark with Stale label. |
Beta Was this translation helpful? Give feedback.
0 replies
-
Move feature "request" to the discussion forum. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Is your feature request related to a problem? Please describe.
In the pulsar function, I don't want to define a particular class as O in
public interface Function<I, O> { O process(I var1, Context var2) throws Exception; }
because I have a POJO class, but I always want to push a wrap class to wrap the POJO class as output. because the only different thing between this wrap class and the POJO class is wrap class always adds a JSON field to any of POJO class.
We can prodcue a Generic message with pulsar.client.api:
RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record(USER_CONST.SCHEMA_NAME); recordSchemaBuilder.field(USER_CONST.AGE).type(SchemaType.INT32); recordSchemaBuilder.field(USER_CONST.NAME).type(SchemaType.STRING); recordSchemaBuilder.field(USER_CONST.JSON_FOR_EVENT).type(SchemaType.STRING); SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO); GenericSchema userSchema = Schema.generic(schemaInfo); Producer<GenericRecord> producer = pulsarClient.newProducer( userSchema) .topic(TOPIC_CONST.TOPIC_FUN_GENERIC) .create(); User user = new User(); user.age = 27; user.name = "Simon Generic"; Gson gson = new Gson(); String jsonForEvent = gson.toJson(user); GenericRecordBuilder genericRecordBuilder = userSchema.newRecordBuilder(); GenericRecord genericRecord = genericRecordBuilder .set(USER_CONST.AGE, user.age) .set(USER_CONST.NAME,user.name) .set(USER_CONST.JSON_FOR_EVENT, jsonForEvent) .build();
but, I am trying many ways to produce dynamic object into a pulsar, just like:
public class FunWrapGeneric implements Function<String, GenericRecord> { @Override public GenericRecord process(String input, Context context) throws Exception {
but I always got failed. Can we support this?
Describe the solution you'd like
Add GenericRecord support for pulsar function
public class FunWrapGeneric implements Function<String, GenericRecord>
or Add a generic Class
public class FunWrapGeneric implements Function<String, RecordWrap<T>> { @Override public RecordWrap<T> process(String input, Context context) throws Exception {
and interface RecordWrap
support add or reduce fields from T.
or Add an override of pulsar function want a Generic class as a parameter of pulsar function
FunWrapGeneric implements Function<String,Schema>
cc @sijie @codelipenghui
Beta Was this translation helpful? Give feedback.
All reactions