Skip to content

feat(bigquery/storage/managedwriter): expose connection multiplexing as experimental #7673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 7, 2023
24 changes: 24 additions & 0 deletions bigquery/storage/managedwriter/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,5 +208,29 @@ With write retries enabled, failed writes will be automatically attempted a fini
In support of the retry changes, the AppendResult returned as part of an append call now includes
TotalAttempts(), which returns the number of times that specific append was enqueued to the service.
Values larger than 1 are indicative of a specific append being enqueued multiple times.

# Connection Sharing (Multiplexing)

Note: This feature is EXPERIMENTAL and subject to change.

The BigQuery Write API enforces a limit on the number of concurrent open connections, documented
here: https://cloud.google.com/bigquery/quotas#write-api-limits

Users can now choose to enable connection sharing (multiplexing) when using ManagedStream writers
that use default streams. The intent of this feature is to simplify connection management for users
who wish to write to many tables, at a cardinality beyond the open connection quota. Please note that
explicit streams (Committed, Buffered, and Pending) cannot leverage the connection sharing feature.

Multiplexing features are controlled by the package-specific custom ClientOption options exposed within
this package. Additionally, some of the connection-related WriterOptions that can be specified when
constructing ManagedStream writers are ignored for writers that leverage the shared multiplex connections.

At a high level, multiplexing uses some heuristics based on the flow control of the shared connections
to infer whether the pool should add additional connections up to a user-specific limit per region,
and attempts to balance traffic from writers to those connections.

Special Consideration: Users who would like to utilize many connections associated with a single Client
may benefit from setting the WithGRPCConnectionPool ClientOption, documented here:
https://pkg.go.dev/google.golang.org/api/option#WithGRPCConnectionPool
*/
package managedwriter
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client,
}

func TestIntegration_MultiplexWrites(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t, enableMultiplex(true, 5))
mwClient, bqClient := getTestClients(context.Background(), t, EnableMultiplexing(true))
defer mwClient.Close()
defer bqClient.Close()

Expand Down
99 changes: 80 additions & 19 deletions bigquery/storage/managedwriter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ func newWriterClientConfig(opts ...option.ClientOption) *writerClientConfig {
wOpt.ApplyWriterOpt(conf)
}
}

// Normalize the config to ensure we're dealing with sane values.
if conf.useMultiplex {
if conf.maxMultiplexPoolSize < 1 {
conf.maxMultiplexPoolSize = 1
}
}
if conf.defaultInflightBytes < 0 {
conf.defaultInflightBytes = 0
}
if conf.defaultInflightRequests < 0 {
conf.defaultInflightRequests = 0
}
return conf
}

Expand All @@ -48,31 +61,55 @@ type writerClientOption interface {
ApplyWriterOpt(*writerClientConfig)
}

// enableMultiplex enables multiplex behavior in the client.
// maxSize indicates the maximum number of shared multiplex connections
// in a given location/region
// EnableMultiplexing is an EXPERIMENTAL option that enables connection sharing
// when instantiating the Client. Only writes to default streams can leverage the
// multiplex pool. Internally, the client maintains a pool of connections per BigQuery
// destination region, and will grow the pool to it's maximum allowed size if there's
// sufficient traffic on the shared connection(s).
//
// TODO: export this as part of the multiplex feature launch.
func enableMultiplex(enable bool, maxSize int) option.ClientOption {
return &enableMultiplexSetting{useMultiplex: enable, maxSize: maxSize}
// This ClientOption is EXPERIMENTAL and subject to change.
func EnableMultiplexing(enable bool) option.ClientOption {
return &enableMultiplexSetting{useMultiplex: enable}
}

type enableMultiplexSetting struct {
internaloption.EmbeddableAdapter
useMultiplex bool
maxSize int
}

func (s *enableMultiplexSetting) ApplyWriterOpt(c *writerClientConfig) {
c.useMultiplex = s.useMultiplex
}

// MaxMultiplexPoolSize is an EXPERIMENTAL option that sets the maximum
// shared multiplex pool size when instantiating the Client. If multiplexing
// is not enabled, this setting is ignored. By default, the limit is a single
// shared connection.
//
// This ClientOption is EXPERIMENTAL and subject to change.
func MaxMultiplexPoolSize(maxSize int) option.ClientOption {
return &maxMultiplexPoolSizeSetting{maxSize: maxSize}
}

type maxMultiplexPoolSizeSetting struct {
internaloption.EmbeddableAdapter
maxSize int
}

func (s *maxMultiplexPoolSizeSetting) ApplyWriterOpt(c *writerClientConfig) {
c.maxMultiplexPoolSize = s.maxSize
}

// defaultMaxInflightRequests sets the default flow controller limit for requests for
// all AppendRows connections created by this client.
// DefaultMaxInflightRequests is an EXPERIMENTAL ClientOption for controlling
// the default limit of how many individual AppendRows write requests can
// be in flight on a connection at a time. This limit is enforced on all connections
// created by the instantiated Client.
//
// TODO: export this as part of the multiplex feature launch.
func defaultMaxInflightRequests(n int) option.ClientOption {
// Note: the WithMaxInflightRequests WriterOption can still be used to control
// the behavior for individual ManagedStream writers when not using multiplexing.
//
// This ClientOption is EXPERIMENTAL and subject to change.
func DefaultMaxInflightRequests(n int) option.ClientOption {
return &defaultInflightRequestsSetting{maxRequests: n}
}

Expand All @@ -85,11 +122,16 @@ func (s *defaultInflightRequestsSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultInflightRequests = s.maxRequests
}

// defaultMaxInflightBytes sets the default flow controller limit for bytes for
// all AppendRows connections created by this client.
// DefaultMaxInflightBytes is an EXPERIMENTAL ClientOption for controlling
// the default byte limit for how many individual AppendRows write requests can
// be in flight on a connection at a time. This limit is enforced on all connections
// created by the instantiated Client.
//
// Note: the WithMaxInflightBytes WriterOption can still be used to control
// the behavior for individual ManagedStream writers when not using multiplexing.
//
// TODO: export this as part of the multiplex feature launch.
func defaultMaxInflightBytes(n int) option.ClientOption {
// This ClientOption is EXPERIMENTAL and subject to change.
func DefaultMaxInflightBytes(n int) option.ClientOption {
return &defaultInflightBytesSetting{maxBytes: n}
}

Expand All @@ -102,11 +144,18 @@ func (s *defaultInflightBytesSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultInflightBytes = s.maxBytes
}

// defaultAppendRowsCallOptions sets a gax.CallOption passed when opening
// the AppendRows bidi connection.
// DefaultAppendRowsCallOption is an EXPERIMENTAL ClientOption for controlling
// the gax.CallOptions passed when opening the underlying AppendRows bidi
// stream connections used by this library to communicate with the BigQuery
// Storage service. This option is propagated to all
// connections created by the instantiated Client.
//
// TODO: export this as part of the multiplex feature launch.
func defaultAppendRowsCallOption(o gax.CallOption) option.ClientOption {
// Note: the WithAppendRowsCallOption WriterOption can still be used to control
// the behavior for individual ManagedStream writers that don't participate
// in multiplexing.
//
// This ClientOption is EXPERIMENTAL and subject to change.
func DefaultAppendRowsCallOption(o gax.CallOption) option.ClientOption {
return &defaultAppendRowsCallOptionSetting{opt: o}
}

Expand Down Expand Up @@ -152,13 +201,21 @@ func WithDestinationTable(destTable string) WriterOption {
}

// WithMaxInflightRequests bounds the inflight appends on the write connection.
//
// Note: See the DefaultMaxInflightRequests ClientOption for setting a default
// when instantiating a client, rather than setting this limit per-writer.
// This WriterOption is ignored for ManagedStreams that participate in multiplexing.
func WithMaxInflightRequests(n int) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.MaxInflightRequests = n
}
}

// WithMaxInflightBytes bounds the inflight append request bytes on the write connection.
//
// Note: See the DefaultMaxInflightBytes ClientOption for setting a default
// when instantiating a client, rather than setting this limit per-writer.
// This WriterOption is ignored for ManagedStreams that participate in multiplexing.
func WithMaxInflightBytes(n int) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.MaxInflightBytes = n
Expand Down Expand Up @@ -191,6 +248,10 @@ func WithDataOrigin(dataOrigin string) WriterOption {

// WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when
// it opens the underlying append stream.
//
// Note: See the DefaultAppendRowsCallOption ClientOption for setting defaults
// when instantiating a client, rather than setting this limit per-writer. This WriterOption
// is ignored for ManagedStream writers that participate in multiplexing.
func WithAppendRowsCallOption(o gax.CallOption) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions, o)
Expand Down
45 changes: 35 additions & 10 deletions bigquery/storage/managedwriter/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,28 @@ func TestCustomClientOptions(t *testing.T) {
want: &writerClientConfig{},
},
{
desc: "multiplex",
desc: "multiplex enable",
options: []option.ClientOption{
enableMultiplex(true, 4),
EnableMultiplexing(true),
},
want: &writerClientConfig{
useMultiplex: true,
maxMultiplexPoolSize: 4,
maxMultiplexPoolSize: 1,
},
},
{
desc: "multiplex max",
options: []option.ClientOption{
MaxMultiplexPoolSize(99),
},
want: &writerClientConfig{
maxMultiplexPoolSize: 99,
},
},
{
desc: "default requests",
options: []option.ClientOption{
defaultMaxInflightRequests(42),
DefaultMaxInflightRequests(42),
},
want: &writerClientConfig{
defaultInflightRequests: 42,
Expand All @@ -57,7 +66,7 @@ func TestCustomClientOptions(t *testing.T) {
{
desc: "default bytes",
options: []option.ClientOption{
defaultMaxInflightBytes(123),
DefaultMaxInflightBytes(123),
},
want: &writerClientConfig{
defaultInflightBytes: 123,
Expand All @@ -66,21 +75,37 @@ func TestCustomClientOptions(t *testing.T) {
{
desc: "default call options",
options: []option.ClientOption{
defaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
DefaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
},
want: &writerClientConfig{
defaultAppendRowsCallOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
},
},
},
{
desc: "unusual values",
options: []option.ClientOption{
EnableMultiplexing(true),
MaxMultiplexPoolSize(-8),
DefaultMaxInflightBytes(-1),
DefaultMaxInflightRequests(-99),
},
want: &writerClientConfig{
useMultiplex: true,
maxMultiplexPoolSize: 1,
defaultInflightRequests: 0,
defaultInflightBytes: 0,
},
},
{
desc: "multiple options",
options: []option.ClientOption{
enableMultiplex(true, 10),
defaultMaxInflightRequests(99),
defaultMaxInflightBytes(12345),
defaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
EnableMultiplexing(true),
MaxMultiplexPoolSize(10),
DefaultMaxInflightRequests(99),
DefaultMaxInflightBytes(12345),
DefaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
},
want: &writerClientConfig{
useMultiplex: true,
Expand Down