Skip to content

Update docs for Go client #10490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 6, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 116 additions & 1 deletion site2/docs/client-libraries-go.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ The following configurable parameters are available for Pulsar clients:
| TLSTrustCertsFilePath | Set the path to the trusted TLS certificate file | |
| TLSAllowInsecureConnection | Configure whether the Pulsar client accept untrusted TLS certificate from broker | false |
| TLSValidateHostname | Configure whether the Pulsar client verify the validity of the host name from broker | false |
| ListenerName | Configure the net model for VPC users to connect to the Pulsar broker | |
| MaxConnectionsPerBroker | Max number of connections to a single broker that is kept in the pool | 1 |
| CustomMetricsLabels | Add custom labels to all the metrics reported by this client instance | |
| Logger | Configure the logger used by the client | logrus.StandardLogger |

## Producers

Expand Down Expand Up @@ -196,6 +200,52 @@ if err != nil {
defer producer.Close()
```

#### How to use schema interface in producer

```go
type testJSON struct {
ID int `json:"id"`
Name string `json:"name"`
}
```

```go
var (
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
)
```

```go
client, err := NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()

properties := make(map[string]string)
properties["pulsar"] = "hello"
jsonSchemaWithProperties := NewJSONSchema(exampleSchemaDef, properties)
producer, err := client.CreateProducer(ProducerOptions{
Topic: "jsonTopic",
Schema: jsonSchemaWithProperties,
})
assert.Nil(t, err)

_, err = producer.Send(context.Background(), &ProducerMessage{
Value: &testJSON{
ID: 100,
Name: "pulsar",
},
})
if err != nil {
log.Fatal(err)
}
producer.Close()
```

#### How to use delay relative in producer

```go
Expand Down Expand Up @@ -260,13 +310,21 @@ canc()
| Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | |
| Name | Name specify a name for the producer. If not assigned, the system will generate a globally unique name which can be access with Producer.ProducerName(). | |
| Properties | Properties attach a set of application defined properties to the producer This properties will be visible in the topic stats | |
| SendTimeout | SendTimeout set the timeout for a message that is not acknowledged by the server | 30s |
| DisableBlockIfQueueFull | DisableBlockIfQueueFull control whether Send and SendAsync block if producer's message queue is full | false |
| MaxPendingMessages| MaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. | |
| HashingScheme | HashingScheme change the `HashingScheme` used to chose the partition on where to publish a particular message. | JavaStringHash |
| CompressionType | CompressionType set the compression type for the producer. | not compressed |
| CompressionLevel | Define the desired compression level. Options: Default, Faster and Better | Default |
| MessageRouter | MessageRouter set a custom message routing policy by passing an implementation of MessageRouter | |
| DisableBatching | DisableBatching control whether automatic batching of messages is enabled for the producer. | false |
| BatchingMaxPublishDelay | BatchingMaxPublishDelay set the time period within which the messages sent will be batched | 1ms |
| BatchingMaxMessages | BatchingMaxMessages set the maximum number of messages permitted in a batch. | 1000 |
| BatchingMaxSize | BatchingMaxSize sets the maximum number of bytes permitted in a batch. | 128KB |
| Schema | Schema set a custom schema type by passing an implementation of `Schema` | bytes[] |
| Interceptors | A chain of interceptors. These interceptors are called at some points defined in the `ProducerInterceptor` interface. | None |
| MaxReconnectToBroker | MaxReconnectToBroker set the maximum retry number of reconnectToBroker | ultimate |
| BatcherBuilderType | BatcherBuilderType sets the batch builder type. This is used to create a batch container when batching is enabled. Options: DefaultBatchBuilder and KeyBasedBatchBuilder | DefaultBatchBuilder |

## Consumers

Expand Down Expand Up @@ -309,17 +367,20 @@ Method | Description | Return type
`Subscription()` | Returns the consumer's subscription name | `string`
`Unsubcribe()` | Unsubscribes the consumer from the assigned topic. Throws an error if the unsubscribe operation is somehow unsuccessful. | `error`
`Receive(context.Context)` | Receives a single message from the topic. This method blocks until a message is available. | `(Message, error)`
`Chan()` | Chan returns a channel from which to consume messages. | `<-chan ConsumerMessage`
`Ack(Message)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) a message to the Pulsar [broker](reference-terminology.md#broker) |
`AckID(MessageID)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) a message to the Pulsar [broker](reference-terminology.md#broker) by message ID |
`ReconsumeLater(msg Message, delay time.Duration)` | ReconsumeLater mark a message for redelivery after custom delay |
`Nack(Message)` | Acknowledge the failure to process a single message. |
`NackID(MessageID)` | Acknowledge the failure to process a single message. |
`Seek(msgID MessageID)` | Reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic. | `error`
`SeekByTime(time time.Time)` | Reset the subscription associated with this consumer to a specific message publish time. | `error`
`Close()` | Closes the consumer, disabling its ability to receive messages from the broker |
`Name()` | Name returns the name of consumer | `string`

### Receive example

#### How to use regx consumer
#### How to use regex consumer

```go
client, err := pulsar.NewClient(pulsar.ClientOptions{
Expand Down Expand Up @@ -458,6 +519,51 @@ if err != nil {
}
```

#### How to use schema in consumer

```go
type testJSON struct {
ID int `json:"id"`
Name string `json:"name"`
}
```

```go
var (
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
)
```

```go
client, err := NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()

var s testJSON

consumerJS := NewJSONSchema(exampleSchemaDef, nil)
consumer, err := client.Subscribe(ConsumerOptions{
Topic: "jsonTopic",
SubscriptionName: "sub-1",
Schema: consumerJS,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
err = msg.GetSchemaValue(&s)
if err != nil {
log.Fatal(err)
}

defer consumer.Close()
```


### Consumer configuration

Expand All @@ -478,6 +584,11 @@ if err != nil {
| NackRedeliveryDelay | The delay after which to redeliver the messages that failed to be processed | 1min |
| ReadCompacted | If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog of the topic | false |
| ReplicateSubscriptionState | Mark the subscription as replicated to keep it in sync across clusters | false |
| KeySharedPolicy | Configuration for Key Shared consumer policy. | |
| RetryEnable | Auto retry send messages to default filled DLQPolicy topics | false |
| Interceptors | A chain of interceptors. These interceptors are called at some points defined in the `ConsumerInterceptor` interface. | |
| MaxReconnectToBroker | MaxReconnectToBroker set the maximum retry number of reconnectToBroker. | ultimate |
| Schema | Schema set a custom schema type by passing an implementation of `Schema` | bytes[] |

## Readers

Expand All @@ -504,6 +615,8 @@ Method | Description | Return type
`Next(context.Context)` | Receives the next message on the topic (analogous to the `Receive` method for [consumers](#consumer-operations)). This method blocks until a message is available. | `(Message, error)`
`HasNext()` | Check if there is any message available to read from the current position| (bool, error)
`Close()` | Closes the reader, disabling its ability to receive messages from the broker | `error`
`Seek(MessageID)` | Reset the subscription associated with this reader to a specific message ID | `error`
`SeekByTime(time time.Time)` | Reset the subscription associated with this reader to a specific message publish time | `error`

### Reader example

Expand Down Expand Up @@ -666,7 +779,9 @@ The following methods parameters are available for `ProducerMessage` objects:
Parameter | Description
:---------|:-----------
`Payload` | The actual data payload of the message
`Value` | Value and payload is mutually exclusive, `Value interface{}` for schema message.
`Key` | The optional key associated with the message (particularly useful for things like topic compaction)
`OrderingKey` | OrderingKey sets the ordering key of the message.
`Properties` | A key-value map (both keys and values must be strings) for any application-specific metadata attached to the message
`EventTime` | The timestamp associated with the message
`ReplicationClusters` | The clusters to which this message will be replicated. Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default.
Expand Down