Skip to content

Commit 794cd24

Browse files
authored
[receiver/sqlquery] Fix memory leak and failing tests (#31785)
**Description:** <Describe what has changed.> <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> This PR includes two changes that are dependent on each other. 1. Fix test failing in #31778. Explanation given [here.](#31778 (comment)) All changes in `integration_test.go` are related to this. 2. When the test was fixed, `goleak` started failing. The logs receiver opens a DB connection when it's started, but shutdown does not close the DB. This DB needs to be closed during shutdown to avoid a leaked goroutine. All changes outside of `integration_test.go` are for this. 3. Since the memory leak changes were modifying errors, I moved from using `multierr.append` to `errors.Join` as well. **Link to tracking Issue:** <Issue number if applicable> Resolves #31782 Related to #31778 **Testing:** <Describe what testing was performed and which tests were added.> Tests are passing
1 parent d82d6f5 commit 794cd24

File tree

4 files changed

+51
-17
lines changed

4 files changed

+51
-17
lines changed

.chloggen/sqlquery_shutdown.yaml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: sqlqueryreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix memory leak on shutdown for log telemetry
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [31782]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/sqlqueryreceiver/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ require (
2020
go.opentelemetry.io/otel/metric v1.24.0
2121
go.opentelemetry.io/otel/trace v1.24.0
2222
go.uber.org/goleak v1.3.0
23-
go.uber.org/multierr v1.11.0
2423
go.uber.org/zap v1.27.0
2524
)
2625

@@ -141,6 +140,7 @@ require (
141140
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
142141
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
143142
go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect
143+
go.uber.org/multierr v1.11.0 // indirect
144144
golang.org/x/crypto v0.21.0 // indirect
145145
golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 // indirect
146146
golang.org/x/mod v0.14.0 // indirect

receiver/sqlqueryreceiver/integration_test.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.opentelemetry.io/collector/component/componenttest"
2525
"go.opentelemetry.io/collector/consumer/consumertest"
2626
"go.opentelemetry.io/collector/pdata/plog"
27+
"go.opentelemetry.io/collector/receiver"
2728
"go.opentelemetry.io/collector/receiver/receivertest"
2829
"go.uber.org/zap"
2930

@@ -48,7 +49,8 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) {
4849
}()
4950

5051
// Start the SQL Query receiver.
51-
receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort)
52+
receiverCreateSettings := receivertest.NewNopCreateSettings()
53+
receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings)
5254
config.CollectionInterval = time.Second
5355
config.Queries = []sqlquery.Query{
5456
{
@@ -84,7 +86,7 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) {
8486
require.NoError(t, err)
8587

8688
// Start new SQL Query receiver with the same configuration.
87-
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort)
89+
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings)
8890
config.CollectionInterval = time.Second
8991
config.Queries = []sqlquery.Query{
9092
{
@@ -134,7 +136,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) {
134136
storageExtension := storagetest.NewFileBackedStorageExtension("test", storageDir)
135137

136138
// create SQL Query receiver configured with the File Storage extension
137-
receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort)
139+
receiverCreateSettings := receivertest.NewNopCreateSettings()
140+
receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings)
138141
config.CollectionInterval = time.Second
139142
config.StorageID = &storageExtension.ID
140143
config.Queries = []sqlquery.Query{
@@ -176,7 +179,7 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) {
176179
testAllSimpleLogs(t, consumer.AllLogs())
177180

178181
// start the SQL Query receiver again
179-
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort)
182+
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings)
180183
config.CollectionInterval = time.Second
181184
config.StorageID = &storageExtension.ID
182185
config.Queries = []sqlquery.Query{
@@ -209,7 +212,7 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) {
209212
insertPostgresSimpleLogs(t, dbContainer, initialLogCount, newLogCount)
210213

211214
// start the SQL Query receiver again
212-
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort)
215+
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings)
213216
config.CollectionInterval = time.Second
214217
config.StorageID = &storageExtension.ID
215218
config.Queries = []sqlquery.Query{
@@ -276,15 +279,14 @@ func startPostgresDbContainer(t *testing.T, externalPort string) testcontainers.
276279
return container
277280
}
278281

279-
func createTestLogsReceiverForPostgres(t *testing.T, externalPort string) (*logsReceiver, *Config, *consumertest.LogsSink) {
282+
func createTestLogsReceiverForPostgres(t *testing.T, externalPort string, receiverCreateSettings receiver.CreateSettings) (*logsReceiver, *Config, *consumertest.LogsSink) {
280283
factory := NewFactory()
281284
config := factory.CreateDefaultConfig().(*Config)
282285
config.CollectionInterval = time.Second
283286
config.Driver = "postgres"
284287
config.DataSource = fmt.Sprintf("host=localhost port=%s user=otel password=otel sslmode=disable", externalPort)
285288

286289
consumer := &consumertest.LogsSink{}
287-
receiverCreateSettings := receivertest.NewNopCreateSettings()
288290
receiverCreateSettings.Logger = zap.NewExample()
289291
receiver, err := factory.CreateLogsReceiver(
290292
context.Background(),

receiver/sqlqueryreceiver/logs_receiver.go

+14-9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package sqlqueryreceiver // import "github.com/open-telemetry/opentelemetry-coll
66
import (
77
"context"
88
"database/sql"
9+
"errors"
910
"fmt"
1011
"time"
1112

@@ -16,7 +17,6 @@ import (
1617
"go.opentelemetry.io/collector/pdata/plog"
1718
"go.opentelemetry.io/collector/receiver"
1819
"go.opentelemetry.io/collector/receiver/receiverhelper"
19-
"go.uber.org/multierr"
2020
"go.uber.org/zap"
2121

2222
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"
@@ -174,21 +174,21 @@ func (receiver *logsReceiver) Shutdown(ctx context.Context) error {
174174
return nil
175175
}
176176

177+
var errs []error
177178
receiver.settings.Logger.Debug("stopping...")
178179
receiver.stopCollecting()
179180
for _, queryReceiver := range receiver.queryReceivers {
180-
queryReceiver.shutdown(ctx)
181+
errs = append(errs, queryReceiver.shutdown(ctx))
181182
}
182183

183-
var errors error
184184
if receiver.storageClient != nil {
185-
errors = multierr.Append(errors, receiver.storageClient.Close(ctx))
185+
errs = append(errs, receiver.storageClient.Close(ctx))
186186
}
187187

188188
receiver.isStarted = false
189189
receiver.settings.Logger.Debug("stopped.")
190190

191-
return errors
191+
return errors.Join(errs...)
192192
}
193193

194194
func (receiver *logsReceiver) stopCollecting() {
@@ -286,19 +286,19 @@ func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs,
286286
return logs, fmt.Errorf("error getting rows: %w", err)
287287
}
288288

289-
var errs error
289+
var errs []error
290290
scopeLogs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
291291
for logsConfigIndex, logsConfig := range queryReceiver.query.Logs {
292292
for _, row := range rows {
293293
logRecord := scopeLogs.AppendEmpty()
294294
rowToLog(row, logsConfig, logRecord)
295295
logRecord.SetObservedTimestamp(observedAt)
296296
if logsConfigIndex == 0 {
297-
errs = multierr.Append(errs, queryReceiver.storeTrackingValue(ctx, row))
297+
errs = append(errs, queryReceiver.storeTrackingValue(ctx, row))
298298
}
299299
}
300300
}
301-
return logs, nil
301+
return logs, errors.Join(errs...)
302302
}
303303

304304
func (queryReceiver *logsQueryReceiver) storeTrackingValue(ctx context.Context, row sqlquery.StringMap) error {
@@ -319,5 +319,10 @@ func rowToLog(row sqlquery.StringMap, config sqlquery.LogsCfg, logRecord plog.Lo
319319
logRecord.Body().SetStr(row[config.BodyColumn])
320320
}
321321

322-
func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) {
322+
func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) error {
323+
if queryReceiver.db == nil {
324+
return nil
325+
}
326+
327+
return queryReceiver.db.Close()
323328
}

0 commit comments

Comments
 (0)