Skip to content

SQS Memory Leak #3063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
2 of 3 tasks
silviolleite opened this issue Apr 16, 2025 · 3 comments
Closed
2 of 3 tasks

SQS Memory Leak #3063

silviolleite opened this issue Apr 16, 2025 · 3 comments
Labels
response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days.

Comments

@silviolleite
Copy link

silviolleite commented Apr 16, 2025

Acknowledgements

Describe the bug

It seems that SQS client is allocating and not freeing up heap after queries.
I am querying to receive and delete message like a real worker, and heap memory is rising on each query and not freeing up.

Regression Issue

  • Select this option if this issue appears to be a regression.

Expected Behavior

After I am done with query results the heap to go back to pre-request state.

Current Behavior

Heap is growing after each queries and not freeing up:

pprof alloc_space:

(pprof) top
Showing nodes accounting for 87.88MB, 41.17% of 213.47MB total
Dropped 102 nodes (cum <= 1.07MB)
Showing top 10 nodes out of 226
      flat  flat%   sum%        cum   cum%
   12.63MB  5.92%  5.92%    12.63MB  5.92%  encoding/json.(*Decoder).refill
      12MB  5.62% 11.54%       15MB  7.03%  github.com/aws/smithy-go/middleware.(*orderedIDs).Add
   10.50MB  4.92% 16.46%    10.50MB  4.92%  net/http.Header.Clone
    9.50MB  4.45% 20.91%     9.50MB  4.45%  strings.(*Builder).grow
    8.01MB  3.75% 24.66%    15.51MB  7.27%  encoding/json.unquote
    7.51MB  3.52% 28.18%     7.51MB  3.52%  encoding/json.unquoteBytes
    7.50MB  3.51% 31.69%        9MB  4.22%  github.com/aws/aws-sdk-go-v2/service/sqs.addProtocolFinalizerMiddlewares
    7.50MB  3.51% 35.21%     7.50MB  3.51%  strings.(*Builder).WriteString
    7.22MB  3.38% 38.59%     7.22MB  3.38%  net/http.init.func15
    5.50MB  2.58% 41.17%     5.50MB  2.58%  github.com/aws/smithy-go/middleware.(*orderedIDs).GetOrder
(pprof) list Add
Total: 213.47MB
ROUTINE ======================== github.com/aws/aws-sdk-go-v2/aws/middleware.(*RequestUserAgent).AddSDKAgentKeyValue in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/aws/middleware/user_agent.go
         0   512.05kB (flat, cum)  0.23% of Total
         .          .    310:func (u *RequestUserAgent) AddSDKAgentKeyValue(keyType SDKAgentKeyType, key, value string) {
         .          .    311:   // TODO: should target sdkAgent
         .   512.05kB    312:   u.userAgent.AddKeyValue(keyType.string(), strings.Map(rules, key)+"#"+strings.Map(rules, value))
         .          .    313:}
         .          .    314:
         .          .    315:// AddCredentialsSource adds the credential source as a feature on the User-Agent string
         .          .    316:func (u *RequestUserAgent) AddCredentialsSource(source aws.CredentialSource) {
         .          .    317:   x, ok := credentialSourceToFeature[source]
ROUTINE ======================== github.com/aws/aws-sdk-go-v2/aws/middleware.(*RequestUserAgent).AddUserAgentFeature in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/aws/middleware/user_agent.go
  512.10kB   512.10kB (flat, cum)  0.23% of Total
         .          .    299:func (u *RequestUserAgent) AddUserAgentFeature(feature UserAgentFeature) {
  512.10kB   512.10kB    300:   u.features[feature] = struct{}{}
         .          .    301:}
         .          .    302:
         .          .    303:// AddSDKAgentKey adds the component identified by name to the User-Agent string.
         .          .    304:func (u *RequestUserAgent) AddSDKAgentKey(keyType SDKAgentKeyType, key string) {
         .          .    305:   // TODO: should target sdkAgent
ROUTINE ======================== github.com/aws/aws-sdk-go-v2/aws/middleware.AddRawResponse.HandleDeserialize in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/aws/middleware/middleware.go
         0    60.65MB (flat, cum) 28.41% of Total
         .          .    151:func (m AddRawResponse) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler) (
         .          .    152:   out middleware.DeserializeOutput, metadata middleware.Metadata, err error,
         .          .    153:) {
         .    60.65MB    154:   out, metadata, err = next.HandleDeserialize(ctx, in)
         .          .    155:   metadata.Set(rawResponseKey{}, out.RawResponse)
         .          .    156:   return out, metadata, err
         .          .    157:}
         .          .    158:
         .          .    159:// AddRawResponseToMetadata adds middleware to the middleware stack that
ROUTINE ======================== github.com/aws/aws-sdk-go-v2/internal/middleware.(*AddTimeOffsetMiddleware).HandleDeserialize in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/internal/middleware/middleware.go
         0    26.01MB (flat, cum) 12.18% of Total
         .          .     35:func (m *AddTimeOffsetMiddleware) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler) (
         .          .     36:   out middleware.DeserializeOutput, metadata middleware.Metadata, err error,
         .          .     37:) {
         .          .     38:   if v := internalcontext.GetAttemptSkewContext(ctx); v != 0 {
         .          .     39:           m.Offset.Store(v.Nanoseconds())
         .          .     40:   }
         .    26.01MB     41:   return next.HandleDeserialize(ctx, in)
         .          .     42:}
ROUTINE ======================== github.com/aws/aws-sdk-go-v2/internal/middleware.AddTimeOffsetMiddleware.HandleBuild in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/internal/middleware/middleware.go
         0   109.66MB (flat, cum) 51.37% of Total
         .          .     23:func (m AddTimeOffsetMiddleware) HandleBuild(ctx context.Context, in middleware.BuildInput, next middleware.BuildHandler) (
         .          .     24:   out middleware.BuildOutput, metadata middleware.Metadata, err error,
         .          .     25:) {
         .          .     26:   if m.Offset != nil {
         .          .     27:           offset := time.Duration(m.Offset.Load())
         .        1MB     28:           ctx = internalcontext.SetAttemptSkewContext(ctx, offset)
         .          .     29:   }
         .   108.66MB     30:   return next.HandleBuild(ctx, in)
         .          .     31:}
         .          .     32:
         .          .     33:// HandleDeserialize gets the clock skew context from the context, and if set, sets it on the pointer
         .          .     34:// held by AddTimeOffsetMiddleware
         .          .     35:func (m *AddTimeOffsetMiddleware) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler) (
ROUTINE ======================== github.com/aws/aws-sdk-go-v2/service/sqs.getOrAddRequestUserAgent in /home/silvio.leite/go/pkg/mod/github.com/aws/aws-sdk-go-v2/service/[email protected]/api_client.go
         0     3.50MB (flat, cum)  1.64% of Total
         .          .    568:func getOrAddRequestUserAgent(stack *middleware.Stack) (*awsmiddleware.RequestUserAgent, error) {
         .          .    569:   id := (*awsmiddleware.RequestUserAgent)(nil).ID()
         .          .    570:   mw, ok := stack.Build.Get(id)
         .          .    571:   if !ok {
         .     3.50MB    572:           mw = awsmiddleware.NewRequestUserAgent()
         .          .    573:           if err := stack.Build.Add(mw, middleware.After); err != nil {
         .          .    574:                   return nil, err
         .          .    575:           }
         .          .    576:   }
         .          .    577:
ROUTINE ======================== github.com/aws/smithy-go/middleware.(*BuildStep).Add in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/middleware/step_build.go
         0        2MB (flat, cum)  0.94% of Total
         .          .    129:func (s *BuildStep) Add(m BuildMiddleware, pos RelativePosition) error {
         .        2MB    130:   return s.ids.Add(m, pos)
         .          .    131:}
         .          .    132:
         .          .    133:// Insert injects the middleware relative to an existing middleware id.
         .          .    134:// Returns an error if the original middleware does not exist, or the middleware
         .          .    135:// being added already exists.
ROUTINE ======================== github.com/aws/smithy-go/middleware.(*DeserializeStep).Add in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/middleware/step_deserialize.go
         0     8.50MB (flat, cum)  3.98% of Total
         .          .    135:func (s *DeserializeStep) Add(m DeserializeMiddleware, pos RelativePosition) error {
         .     8.50MB    136:   return s.ids.Add(m, pos)
         .          .    137:}
         .          .    138:
         .          .    139:// Insert injects the middleware relative to an existing middleware ID.
         .          .    140:// Returns error if the original middleware does not exist, or the middleware
         .          .    141:// being added already exists.
ROUTINE ======================== github.com/aws/smithy-go/middleware.(*FinalizeStep).Add in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/middleware/step_finalize.go
         0     1.50MB (flat, cum)   0.7% of Total
         .          .    129:func (s *FinalizeStep) Add(m FinalizeMiddleware, pos RelativePosition) error {
         .     1.50MB    130:   return s.ids.Add(m, pos)
         .          .    131:}
         .          .    132:
         .          .    133:// Insert injects the middleware relative to an existing middleware ID.
         .          .    134:// Returns error if the original middleware does not exist, or the middleware
         .          .    135:// being added already exists.
ROUTINE ======================== github.com/aws/smithy-go/middleware.(*InitializeStep).Add in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/middleware/step_initialize.go
         0     2.50MB (flat, cum)  1.17% of Total
         .          .    129:func (s *InitializeStep) Add(m InitializeMiddleware, pos RelativePosition) error {
         .     2.50MB    130:   return s.ids.Add(m, pos)
         .          .    131:}
         .          .    132:
         .          .    133:// Insert injects the middleware relative to an existing middleware ID.
         .          .    134:// Returns error if the original middleware does not exist, or the middleware
         .          .    135:// being added already exists.
ROUTINE ======================== github.com/aws/smithy-go/middleware.(*SerializeStep).Add in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/middleware/step_serialize.go
         0   512.14kB (flat, cum)  0.23% of Total
         .          .    137:func (s *SerializeStep) Add(m SerializeMiddleware, pos RelativePosition) error {
         .   512.14kB    138:   return s.ids.Add(m, pos)
         .          .    139:}
         .          .    140:
         .          .    141:// Insert injects the middleware relative to an existing middleware ID.
         .          .    142:// Returns error if the original middleware does not exist, or the middleware
         .          .    143:// being added already exists.
ROUTINE ======================== github.com/aws/smithy-go/middleware.(*orderedIDs).Add in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/middleware/ordered_group.go
      12MB       15MB (flat, cum)  7.03% of Total
         .          .     37:func (g *orderedIDs) Add(m ider, pos RelativePosition) error {
         .          .     38:   id := m.ID()
         .          .     39:   if len(id) == 0 {
         .          .     40:           return fmt.Errorf("empty ID, ID must not be empty")
         .          .     41:   }
         .          .     42:
         .        3MB     43:   if err := g.order.Add(pos, id); err != nil {
         .          .     44:           return err
         .          .     45:   }
         .          .     46:
      12MB       12MB     47:   g.items[id] = m
         .          .     48:   return nil
         .          .     49:}
         .          .     50:
         .          .     51:// Insert injects the item relative to an existing item id. Returns an error if
         .          .     52:// the original item does not exist, or the item being added already exists.
ROUTINE ======================== github.com/aws/smithy-go/middleware.(*relativeOrder).Add in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/middleware/ordered_group.go
       3MB        3MB (flat, cum)  1.41% of Total
         .          .    151:func (s *relativeOrder) Add(pos RelativePosition, ids ...string) error {
         .          .    152:   if len(ids) == 0 {
         .          .    153:           return nil
         .          .    154:   }
         .          .    155:
         .          .    156:   for _, id := range ids {
         .          .    157:           if _, ok := s.has(id); ok {
         .          .    158:                   return fmt.Errorf("already exists, %v", id)
         .          .    159:           }
         .          .    160:   }
         .          .    161:
         .          .    162:   switch pos {
         .          .    163:   case Before:
         .          .    164:           return s.insert(0, Before, ids...)
         .          .    165:
         .          .    166:   case After:
       3MB        3MB    167:           s.order = append(s.order, ids...)
         .          .    168:
         .          .    169:   default:
         .          .    170:           return fmt.Errorf("invalid position, %v", int(pos))
         .          .    171:   }
         .          .    172:
ROUTINE ======================== github.com/aws/smithy-go/transport/http.(*UserAgentBuilder).AddKeyValue in /home/silvio.leite/go/pkg/mod/github.com/aws/[email protected]/transport/http/user_agent.go
         0        2MB (flat, cum)  0.94% of Total
         .          .     23:func (u *UserAgentBuilder) AddKeyValue(key, value string) {
         .        2MB     24:   u.appendTo(key + "/" + value)
         .          .     25:}
         .          .     26:
         .          .     27:// Build returns the constructed User-Agent string. May be called multiple times.
         .          .     28:func (u *UserAgentBuilder) Build() string {
         .          .     29:   return u.sb.String()
(pprof)

Image

Reproduction Steps

My consumer with pprof:

package main

import (
	"context"
	"log"
	"net"
	"net/http"
	_ "net/http/pprof"
	"time"

	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/credentials"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
	"golang.org/x/net/http2"
)

var (
	awsEndpoint    = "http://localhost:4566"
	awsKey         = "dummy"
	awsSecret      = "dummy"
	awsRegion      = "us-east-1"
	sqsMaxMessages = 10
	queueURL       = "http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/example-1"
)

func main() {
	go func() {
		http.ListenAndServe("localhost:8080", nil)
	}()

	ctx := context.Background()
	sqsClient := newSQSClient()
	chnMessages := make(chan types.Message, sqsMaxMessages)

	for w := int32(1); w <= 5; w++ {
		go worker(ctx, sqsClient, chnMessages)
	}

	for {
		pollSqs(ctx, sqsClient, chnMessages)
	}
}

func worker(ctx context.Context, client *sqs.Client, chnMessages <-chan types.Message) {
	for m := range chnMessages {
		handleMessage(m)
		deleteMessage(ctx, client, m)
	}
}

func pollSqs(ctx context.Context, client *sqs.Client, chnMessages chan<- types.Message) {
	output, rErr := client.ReceiveMessage(
		ctx,
		&sqs.ReceiveMessageInput{
			QueueUrl:                    &queueURL,
			WaitTimeSeconds:             20,
			MaxNumberOfMessages:         10,
			MessageAttributeNames:       []string{"All"},
			MessageSystemAttributeNames: []types.MessageSystemAttributeName{types.MessageSystemAttributeNameAll},
		},
	)
	if rErr != nil {
		log.Println("Error receiving messages:", rErr)
	}

	for _, m := range output.Messages {
		chnMessages <- m
	}
}

func handleMessage(message types.Message) {
	log.Println("Received message: ", *message.Body)
	// simulate real process
	time.Sleep(1 * time.Second)
}

func deleteMessage(ctx context.Context, client *sqs.Client, message types.Message) {
	// delete message after process
	_, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
		QueueUrl:      &queueURL,
		ReceiptHandle: message.ReceiptHandle,
	})
	if err != nil {
		log.Println("Error deleting message:", err)
	}
}

func newSQSClient() *sqs.Client {
	httpClient := newHTTPClientWithSettings()

	cfg, err := config.LoadDefaultConfig(
		context.TODO(),
		config.WithRegion(awsRegion),
		config.WithHTTPClient(httpClient),
		config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(awsKey, awsSecret, "")),
	)
	if err != nil {
		log.Fatalf("unable to load SDK config, %v", err)
	}

	cfg.BaseEndpoint = &awsEndpoint
	// Create an SQS client with the configuration
	return sqs.NewFromConfig(cfg)
}

func newHTTPClientWithSettings() *http.Client {
	tr := &http.Transport{
		ResponseHeaderTimeout: 5 * time.Second,
		Proxy:                 http.ProxyFromEnvironment,
		DialContext: (&net.Dialer{
			KeepAlive: 30 * time.Second,
			Timeout:   5 * time.Second,
		}).DialContext,
		MaxIdleConns:          20,
		IdleConnTimeout:       30 * time.Second,
		TLSHandshakeTimeout:   5 * time.Second,
		MaxIdleConnsPerHost:   10,
		ExpectContinueTimeout: 1 * time.Second,
	}

	// So client makes HTTP/2 requests
	err := http2.ConfigureTransport(tr)
	if err != nil {
		log.Panic(err)
	}

	return &http.Client{
		Transport: tr,
	}
}

Pprof command:

go tool pprof -alloc_space http://localhost:8080/debug/pprof/heap

I produced 200000 messages in the topic the test.

Possible Solution

No response

Additional Information/Context

No response

AWS Go SDK V2 Module Versions Used

        github.com/aws/aws-sdk-go-v2/config v1.29.14
	github.com/aws/aws-sdk-go-v2/credentials v1.17.67
	github.com/aws/aws-sdk-go-v2/service/sns v1.34.4
	github.com/aws/aws-sdk-go-v2/service/sqs v1.38.5

Compiler and Version used

go version go1.24.2 linux/amd64

Operating System and version

Alpine Linux 3.21.3

@silviolleite silviolleite added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Apr 16, 2025
@Madrigal
Copy link
Contributor

You are getting this read because you're using -alloc_space, which from the docs (emphasize mine)

The allocs profile is the same as the heap profile but changes the default pprof display to -alloc_space, the total number of bytes allocated since the program began (including garbage-collected bytes).

Since this program is doing constant I/O, it is expected that the memory reported as allocated will grow across the lifetime of the program. This will be freed up in time by the GC

Do you see the same behavior with the default inuse_space?

@Madrigal Madrigal added response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days. and removed bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Apr 16, 2025
@silviolleite
Copy link
Author

🤦 No, in inuse_space it is runnig correctly. Thanks @Madrigal

Image

Copy link

This issue is now closed. Comments on closed issues are hard for our team to see.
If you need more assistance, please open a new issue that references this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days.
Projects
None yet
Development

No branches or pull requests

2 participants