diff --git a/pulsar/client.go b/pulsar/client.go index 8ff152a948..9be8463dbb 100644 --- a/pulsar/client.go +++ b/pulsar/client.go @@ -123,6 +123,9 @@ type ClientOptions struct { // Add custom labels to all the metrics reported by this client instance CustomMetricsLabels map[string]string + + // The default max message size in bytes. The broker default is 5,242,880 (5MB) but it can be increased. + DefaultMaxMessageSize int } // Client represents a pulsar client diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 56829270b6..26955009ee 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -102,6 +102,10 @@ func newClient(options ClientOptions) (Client, error) { operationTimeout = defaultOperationTimeout } + if options.DefaultMaxMessageSize < internal.MaxMessageSize { + options.DefaultMaxMessageSize = internal.MaxMessageSize + } + maxConnectionsPerHost := options.MaxConnectionsPerBroker if maxConnectionsPerHost <= 0 { maxConnectionsPerHost = 1 @@ -120,7 +124,7 @@ func newClient(options ClientOptions) (Client, error) { c := &client{ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, maxConnectionsPerHost, logger, - metrics), + options.DefaultMaxMessageSize, metrics), log: logger, metrics: metrics, } diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index ecda4faf22..95c2928edf 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -172,13 +172,14 @@ type connection struct { // connectionOptions defines configurations for creating connection. type connectionOptions struct { - logicalAddr *url.URL - physicalAddr *url.URL - tls *TLSOptions - connectionTimeout time.Duration - auth auth.Provider - logger log.Logger - metrics *Metrics + logicalAddr *url.URL + physicalAddr *url.URL + tls *TLSOptions + connectionTimeout time.Duration + auth auth.Provider + logger log.Logger + defaultMaxMessageSize int32 + metrics *Metrics } func newConnection(opts connectionOptions) *connection { @@ -206,6 +207,7 @@ func newConnection(opts connectionOptions) *connection { listeners: make(map[uint64]ConnectionListener), consumerHandlers: make(map[uint64]ConsumerHandler), metrics: opts.metrics, + maxMessageSize: opts.defaultMaxMessageSize, } cnx.setState(connectionInit) cnx.reader = newConnectionReader(cnx) @@ -317,11 +319,8 @@ func (c *connection) doHandshake() bool { if cmd.Connected.MaxMessageSize != nil && *cmd.Connected.MaxMessageSize > 0 { c.log.Debug("Got MaxMessageSize from handshake response:", *cmd.Connected.MaxMessageSize) c.maxMessageSize = *cmd.Connected.MaxMessageSize - } else { - c.log.Debug("No MaxMessageSize from handshake response, use default: ", MaxMessageSize) - c.maxMessageSize = MaxMessageSize } - c.log.Info("Connection is ready") + c.log.Info("Connection is ready with maxMessageSize ", c.maxMessageSize) c.changeState(connectionReady) return true } diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 2728a73419..04828ca0d6 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -46,6 +46,7 @@ type connectionPool struct { maxConnectionsPerHost int32 roundRobinCnt int32 metrics *Metrics + defaultMaxMessageSize int log log.Logger } @@ -57,6 +58,7 @@ func NewConnectionPool( connectionTimeout time.Duration, maxConnectionsPerHost int, logger log.Logger, + defaultMaxMsgSize int, metrics *Metrics) ConnectionPool { return &connectionPool{ connections: make(map[string]*connection), @@ -66,6 +68,7 @@ func NewConnectionPool( maxConnectionsPerHost: int32(maxConnectionsPerHost), log: logger, metrics: metrics, + defaultMaxMessageSize: defaultMaxMsgSize, } } @@ -89,13 +92,14 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U if conn == nil { conn = newConnection(connectionOptions{ - logicalAddr: logicalAddr, - physicalAddr: physicalAddr, - tls: p.tlsOptions, - connectionTimeout: p.connectionTimeout, - auth: p.auth, - logger: p.log, - metrics: p.metrics, + logicalAddr: logicalAddr, + physicalAddr: physicalAddr, + tls: p.tlsOptions, + connectionTimeout: p.connectionTimeout, + auth: p.auth, + logger: p.log, + metrics: p.metrics, + defaultMaxMessageSize: int32(p.defaultMaxMessageSize), }) p.connections[key] = conn p.Unlock()