From 62f11dae63a07b36f1a00953d867ee1c5be35d50 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Fri, 15 Mar 2024 15:20:25 -0700 Subject: [PATCH 1/5] i This is a combination of 2 commits. [receiver/sqlquery] Fix memory leak --- .chloggen/sqlquery_shutdown.yaml | 27 ++++++++++++++++++++++ receiver/sqlqueryreceiver/logs_receiver.go | 11 ++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) create mode 100644 .chloggen/sqlquery_shutdown.yaml diff --git a/.chloggen/sqlquery_shutdown.yaml b/.chloggen/sqlquery_shutdown.yaml new file mode 100644 index 000000000000..387ffed5d5f3 --- /dev/null +++ b/.chloggen/sqlquery_shutdown.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: sqlqueryreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leak on shutdown for log telemetry + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31782] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/sqlqueryreceiver/logs_receiver.go b/receiver/sqlqueryreceiver/logs_receiver.go index d689397b4922..ede4af999da0 100644 --- a/receiver/sqlqueryreceiver/logs_receiver.go +++ b/receiver/sqlqueryreceiver/logs_receiver.go @@ -174,13 +174,13 @@ func (receiver *logsReceiver) Shutdown(ctx context.Context) error { return nil } + var errors error receiver.settings.Logger.Debug("stopping...") receiver.stopCollecting() for _, queryReceiver := range receiver.queryReceivers { - queryReceiver.shutdown(ctx) + errors = multierr.Append(errors, queryReceiver.shutdown(ctx)) } - var errors error if receiver.storageClient != nil { errors = multierr.Append(errors, receiver.storageClient.Close(ctx)) } @@ -319,5 +319,10 @@ func rowToLog(row sqlquery.StringMap, config sqlquery.LogsCfg, logRecord plog.Lo logRecord.Body().SetStr(row[config.BodyColumn]) } -func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) { +func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) error { + if queryReceiver.db == nil { + return nil + } + + return queryReceiver.db.Close() } From 478fb7aff961a6055a0f0a3f9b237699022c2f5d Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Fri, 15 Mar 2024 15:06:14 -0700 Subject: [PATCH 2/5] Fix no-op unique ID change --- receiver/sqlqueryreceiver/integration_test.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/receiver/sqlqueryreceiver/integration_test.go b/receiver/sqlqueryreceiver/integration_test.go index f8ba19dc4737..5f66d1e63b08 100644 --- a/receiver/sqlqueryreceiver/integration_test.go +++ b/receiver/sqlqueryreceiver/integration_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" @@ -48,7 +49,8 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) { }() // Start the SQL Query receiver. - receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort) + receiverCreateSettings := receivertest.NewNopCreateSettings() + receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings) config.CollectionInterval = time.Second config.Queries = []sqlquery.Query{ { @@ -84,7 +86,7 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) { require.NoError(t, err) // Start new SQL Query receiver with the same configuration. - receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort) + receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings) config.CollectionInterval = time.Second config.Queries = []sqlquery.Query{ { @@ -134,7 +136,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) { storageExtension := storagetest.NewFileBackedStorageExtension("test", storageDir) // create SQL Query receiver configured with the File Storage extension - receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort) + receiverCreateSettings := receivertest.NewNopCreateSettings() + receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings) config.CollectionInterval = time.Second config.StorageID = &storageExtension.ID config.Queries = []sqlquery.Query{ @@ -176,7 +179,7 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) { testAllSimpleLogs(t, consumer.AllLogs()) // start the SQL Query receiver again - receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort) + receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings) config.CollectionInterval = time.Second config.StorageID = &storageExtension.ID config.Queries = []sqlquery.Query{ @@ -209,7 +212,7 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) { insertPostgresSimpleLogs(t, dbContainer, initialLogCount, newLogCount) // start the SQL Query receiver again - receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort) + receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings) config.CollectionInterval = time.Second config.StorageID = &storageExtension.ID config.Queries = []sqlquery.Query{ @@ -276,7 +279,7 @@ func startPostgresDbContainer(t *testing.T, externalPort string) testcontainers. return container } -func createTestLogsReceiverForPostgres(t *testing.T, externalPort string) (*logsReceiver, *Config, *consumertest.LogsSink) { +func createTestLogsReceiverForPostgres(t *testing.T, externalPort string, receiverCreateSettings receiver.CreateSettings) (*logsReceiver, *Config, *consumertest.LogsSink) { factory := NewFactory() config := factory.CreateDefaultConfig().(*Config) config.CollectionInterval = time.Second @@ -284,7 +287,6 @@ func createTestLogsReceiverForPostgres(t *testing.T, externalPort string) (*logs config.DataSource = fmt.Sprintf("host=localhost port=%s user=otel password=otel sslmode=disable", externalPort) consumer := &consumertest.LogsSink{} - receiverCreateSettings := receivertest.NewNopCreateSettings() receiverCreateSettings.Logger = zap.NewExample() receiver, err := factory.CreateLogsReceiver( context.Background(), From 91b29410ff68fd11e212803aa445b389e2faf92c Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Fri, 15 Mar 2024 15:51:31 -0700 Subject: [PATCH 3/5] Use errors.Join instead of multierr --- receiver/sqlqueryreceiver/logs_receiver.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/receiver/sqlqueryreceiver/logs_receiver.go b/receiver/sqlqueryreceiver/logs_receiver.go index ede4af999da0..6eeceafb5ea0 100644 --- a/receiver/sqlqueryreceiver/logs_receiver.go +++ b/receiver/sqlqueryreceiver/logs_receiver.go @@ -6,6 +6,7 @@ package sqlqueryreceiver // import "github.com/open-telemetry/opentelemetry-coll import ( "context" "database/sql" + "errors" "fmt" "time" @@ -16,7 +17,6 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery" @@ -174,21 +174,21 @@ func (receiver *logsReceiver) Shutdown(ctx context.Context) error { return nil } - var errors error + var errs []error receiver.settings.Logger.Debug("stopping...") receiver.stopCollecting() for _, queryReceiver := range receiver.queryReceivers { - errors = multierr.Append(errors, queryReceiver.shutdown(ctx)) + errs = append(errs, queryReceiver.shutdown(ctx)) } if receiver.storageClient != nil { - errors = multierr.Append(errors, receiver.storageClient.Close(ctx)) + errs = append(errs, receiver.storageClient.Close(ctx)) } receiver.isStarted = false receiver.settings.Logger.Debug("stopped.") - return errors + return errors.Join(errs...) } func (receiver *logsReceiver) stopCollecting() { @@ -286,7 +286,7 @@ func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs, return logs, fmt.Errorf("error getting rows: %w", err) } - var errs error + var errs []error scopeLogs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() for logsConfigIndex, logsConfig := range queryReceiver.query.Logs { for _, row := range rows { @@ -294,7 +294,7 @@ func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs, rowToLog(row, logsConfig, logRecord) logRecord.SetObservedTimestamp(observedAt) if logsConfigIndex == 0 { - errs = multierr.Append(errs, queryReceiver.storeTrackingValue(ctx, row)) + errs = append(errs, queryReceiver.storeTrackingValue(ctx, row)) } } } From 8144a5857e8d11fd5e420a104634216591da3e0d Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Fri, 15 Mar 2024 15:55:21 -0700 Subject: [PATCH 4/5] make gotidy --- receiver/sqlqueryreceiver/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/sqlqueryreceiver/go.mod b/receiver/sqlqueryreceiver/go.mod index 5d485e809876..3dc7a02c1043 100644 --- a/receiver/sqlqueryreceiver/go.mod +++ b/receiver/sqlqueryreceiver/go.mod @@ -20,7 +20,6 @@ require ( go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/goleak v1.3.0 - go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 ) @@ -141,6 +140,7 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 // indirect golang.org/x/mod v0.14.0 // indirect From 2d584b2acb538253372c4352235d8d25a3f836ad Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Fri, 15 Mar 2024 16:11:28 -0700 Subject: [PATCH 5/5] Return errors that are compiled --- receiver/sqlqueryreceiver/logs_receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/sqlqueryreceiver/logs_receiver.go b/receiver/sqlqueryreceiver/logs_receiver.go index 6eeceafb5ea0..d343b8f48538 100644 --- a/receiver/sqlqueryreceiver/logs_receiver.go +++ b/receiver/sqlqueryreceiver/logs_receiver.go @@ -298,7 +298,7 @@ func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs, } } } - return logs, nil + return logs, errors.Join(errs...) } func (queryReceiver *logsQueryReceiver) storeTrackingValue(ctx context.Context, row sqlquery.StringMap) error {