Skip to content

Commit 5a1264d

Browse files
authored
Update docs for Go client (#10490)
* Update docs for Go client Signed-off-by: xiaolongran <[email protected]> * Fix comments Signed-off-by: xiaolongran <[email protected]> Co-authored-by: xiaolongran <[email protected]>
1 parent 0355a63 commit 5a1264d

File tree

1 file changed

+116
-1
lines changed

1 file changed

+116
-1
lines changed

site2/docs/client-libraries-go.md

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ The following configurable parameters are available for Pulsar clients:
117117
| TLSTrustCertsFilePath | Set the path to the trusted TLS certificate file | |
118118
| TLSAllowInsecureConnection | Configure whether the Pulsar client accept untrusted TLS certificate from broker | false |
119119
| TLSValidateHostname | Configure whether the Pulsar client verify the validity of the host name from broker | false |
120+
| ListenerName | Configure the net model for VPC users to connect to the Pulsar broker | |
121+
| MaxConnectionsPerBroker | Max number of connections to a single broker that is kept in the pool | 1 |
122+
| CustomMetricsLabels | Add custom labels to all the metrics reported by this client instance | |
123+
| Logger | Configure the logger used by the client | logrus.StandardLogger |
120124

121125
## Producers
122126

@@ -196,6 +200,52 @@ if err != nil {
196200
defer producer.Close()
197201
```
198202

203+
#### How to use schema interface in producer
204+
205+
```go
206+
type testJSON struct {
207+
ID int `json:"id"`
208+
Name string `json:"name"`
209+
}
210+
```
211+
212+
```go
213+
var (
214+
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
215+
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
216+
)
217+
```
218+
219+
```go
220+
client, err := NewClient(pulsar.ClientOptions{
221+
URL: "pulsar://localhost:6650",
222+
})
223+
if err != nil {
224+
log.Fatal(err)
225+
}
226+
defer client.Close()
227+
228+
properties := make(map[string]string)
229+
properties["pulsar"] = "hello"
230+
jsonSchemaWithProperties := NewJSONSchema(exampleSchemaDef, properties)
231+
producer, err := client.CreateProducer(ProducerOptions{
232+
Topic: "jsonTopic",
233+
Schema: jsonSchemaWithProperties,
234+
})
235+
assert.Nil(t, err)
236+
237+
_, err = producer.Send(context.Background(), &ProducerMessage{
238+
Value: &testJSON{
239+
ID: 100,
240+
Name: "pulsar",
241+
},
242+
})
243+
if err != nil {
244+
log.Fatal(err)
245+
}
246+
producer.Close()
247+
```
248+
199249
#### How to use delay relative in producer
200250

201251
```go
@@ -260,13 +310,21 @@ canc()
260310
| Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | |
261311
| Name | Name specify a name for the producer. If not assigned, the system will generate a globally unique name which can be access with Producer.ProducerName(). | |
262312
| Properties | Properties attach a set of application defined properties to the producer This properties will be visible in the topic stats | |
313+
| SendTimeout | SendTimeout set the timeout for a message that is not acknowledged by the server | 30s |
314+
| DisableBlockIfQueueFull | DisableBlockIfQueueFull control whether Send and SendAsync block if producer's message queue is full | false |
263315
| MaxPendingMessages| MaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. | |
264316
| HashingScheme | HashingScheme change the `HashingScheme` used to chose the partition on where to publish a particular message. | JavaStringHash |
265317
| CompressionType | CompressionType set the compression type for the producer. | not compressed |
318+
| CompressionLevel | Define the desired compression level. Options: Default, Faster and Better | Default |
266319
| MessageRouter | MessageRouter set a custom message routing policy by passing an implementation of MessageRouter | |
267320
| DisableBatching | DisableBatching control whether automatic batching of messages is enabled for the producer. | false |
268321
| BatchingMaxPublishDelay | BatchingMaxPublishDelay set the time period within which the messages sent will be batched | 1ms |
269322
| BatchingMaxMessages | BatchingMaxMessages set the maximum number of messages permitted in a batch. | 1000 |
323+
| BatchingMaxSize | BatchingMaxSize sets the maximum number of bytes permitted in a batch. | 128KB |
324+
| Schema | Schema set a custom schema type by passing an implementation of `Schema` | bytes[] |
325+
| Interceptors | A chain of interceptors. These interceptors are called at some points defined in the `ProducerInterceptor` interface. | None |
326+
| MaxReconnectToBroker | MaxReconnectToBroker set the maximum retry number of reconnectToBroker | ultimate |
327+
| BatcherBuilderType | BatcherBuilderType sets the batch builder type. This is used to create a batch container when batching is enabled. Options: DefaultBatchBuilder and KeyBasedBatchBuilder | DefaultBatchBuilder |
270328

271329
## Consumers
272330

@@ -309,17 +367,20 @@ Method | Description | Return type
309367
`Subscription()` | Returns the consumer's subscription name | `string`
310368
`Unsubcribe()` | Unsubscribes the consumer from the assigned topic. Throws an error if the unsubscribe operation is somehow unsuccessful. | `error`
311369
`Receive(context.Context)` | Receives a single message from the topic. This method blocks until a message is available. | `(Message, error)`
370+
`Chan()` | Chan returns a channel from which to consume messages. | `<-chan ConsumerMessage`
312371
`Ack(Message)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) a message to the Pulsar [broker](reference-terminology.md#broker) |
313372
`AckID(MessageID)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) a message to the Pulsar [broker](reference-terminology.md#broker) by message ID |
373+
`ReconsumeLater(msg Message, delay time.Duration)` | ReconsumeLater mark a message for redelivery after custom delay |
314374
`Nack(Message)` | Acknowledge the failure to process a single message. |
315375
`NackID(MessageID)` | Acknowledge the failure to process a single message. |
316376
`Seek(msgID MessageID)` | Reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic. | `error`
317377
`SeekByTime(time time.Time)` | Reset the subscription associated with this consumer to a specific message publish time. | `error`
318378
`Close()` | Closes the consumer, disabling its ability to receive messages from the broker |
379+
`Name()` | Name returns the name of consumer | `string`
319380

320381
### Receive example
321382

322-
#### How to use regx consumer
383+
#### How to use regex consumer
323384

324385
```go
325386
client, err := pulsar.NewClient(pulsar.ClientOptions{
@@ -458,6 +519,51 @@ if err != nil {
458519
}
459520
```
460521

522+
#### How to use schema in consumer
523+
524+
```go
525+
type testJSON struct {
526+
ID int `json:"id"`
527+
Name string `json:"name"`
528+
}
529+
```
530+
531+
```go
532+
var (
533+
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
534+
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
535+
)
536+
```
537+
538+
```go
539+
client, err := NewClient(pulsar.ClientOptions{
540+
URL: "pulsar://localhost:6650",
541+
})
542+
if err != nil {
543+
log.Fatal(err)
544+
}
545+
defer client.Close()
546+
547+
var s testJSON
548+
549+
consumerJS := NewJSONSchema(exampleSchemaDef, nil)
550+
consumer, err := client.Subscribe(ConsumerOptions{
551+
Topic: "jsonTopic",
552+
SubscriptionName: "sub-1",
553+
Schema: consumerJS,
554+
SubscriptionInitialPosition: SubscriptionPositionEarliest,
555+
})
556+
assert.Nil(t, err)
557+
msg, err := consumer.Receive(context.Background())
558+
assert.Nil(t, err)
559+
err = msg.GetSchemaValue(&s)
560+
if err != nil {
561+
log.Fatal(err)
562+
}
563+
564+
defer consumer.Close()
565+
```
566+
461567

462568
### Consumer configuration
463569

@@ -478,6 +584,11 @@ if err != nil {
478584
| NackRedeliveryDelay | The delay after which to redeliver the messages that failed to be processed | 1min |
479585
| ReadCompacted | If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog of the topic | false |
480586
| ReplicateSubscriptionState | Mark the subscription as replicated to keep it in sync across clusters | false |
587+
| KeySharedPolicy | Configuration for Key Shared consumer policy. | |
588+
| RetryEnable | Auto retry send messages to default filled DLQPolicy topics | false |
589+
| Interceptors | A chain of interceptors. These interceptors are called at some points defined in the `ConsumerInterceptor` interface. | |
590+
| MaxReconnectToBroker | MaxReconnectToBroker set the maximum retry number of reconnectToBroker. | ultimate |
591+
| Schema | Schema set a custom schema type by passing an implementation of `Schema` | bytes[] |
481592

482593
## Readers
483594

@@ -504,6 +615,8 @@ Method | Description | Return type
504615
`Next(context.Context)` | Receives the next message on the topic (analogous to the `Receive` method for [consumers](#consumer-operations)). This method blocks until a message is available. | `(Message, error)`
505616
`HasNext()` | Check if there is any message available to read from the current position| (bool, error)
506617
`Close()` | Closes the reader, disabling its ability to receive messages from the broker | `error`
618+
`Seek(MessageID)` | Reset the subscription associated with this reader to a specific message ID | `error`
619+
`SeekByTime(time time.Time)` | Reset the subscription associated with this reader to a specific message publish time | `error`
507620

508621
### Reader example
509622

@@ -666,7 +779,9 @@ The following methods parameters are available for `ProducerMessage` objects:
666779
Parameter | Description
667780
:---------|:-----------
668781
`Payload` | The actual data payload of the message
782+
`Value` | Value and payload is mutually exclusive, `Value interface{}` for schema message.
669783
`Key` | The optional key associated with the message (particularly useful for things like topic compaction)
784+
`OrderingKey` | OrderingKey sets the ordering key of the message.
670785
`Properties` | A key-value map (both keys and values must be strings) for any application-specific metadata attached to the message
671786
`EventTime` | The timestamp associated with the message
672787
`ReplicationClusters` | The clusters to which this message will be replicated. Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default.

0 commit comments

Comments
 (0)