Skip to content

Commit d6bf190

Browse files
Return an error when you try to send a message that's too large. (#20721)
This now works just like the message batch - you'll get an ErrMessageTooLarge if you attempt to send a message that's too large for the link's configured size. NOTE: there's a patch to `internal/go-amqp/Sender.go` to match what's in go-amqp's main so it returns a programmatically useful error when the message is too large. Fixes #20647
1 parent 9111616 commit d6bf190

File tree

4 files changed

+66
-6
lines changed

4 files changed

+66
-6
lines changed

sdk/messaging/azservicebus/internal/go-amqp/sender.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha
101101
maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader
102102
)
103103
if len(msg.DeliveryTag) > maxDeliveryTagLength {
104-
return nil, fmt.Errorf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag))
104+
return nil, &Error{
105+
Condition: ErrCondMessageSizeExceeded,
106+
Description: fmt.Sprintf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag)),
107+
}
105108
}
106109

107110
s.mu.Lock()
@@ -114,7 +117,10 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha
114117
}
115118

116119
if s.l.maxMessageSize != 0 && uint64(s.buf.Len()) > s.l.maxMessageSize {
117-
return nil, fmt.Errorf("encoded message size exceeds max of %d", s.l.maxMessageSize)
120+
return nil, &Error{
121+
Condition: ErrCondMessageSizeExceeded,
122+
Description: fmt.Sprintf("encoded message size exceeds max of %d", s.l.maxMessageSize),
123+
}
118124
}
119125

120126
senderSettled := senderSettleModeValue(s.l.senderSettleMode) == SenderSettleModeSettled

sdk/messaging/azservicebus/message_batch.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
)
1313

1414
// ErrMessageTooLarge is returned when a message cannot fit into a batch when using MessageBatch.Add()
15-
var ErrMessageTooLarge = errors.New("the message could not be added because it is too large for the batch")
15+
// or if the message is being sent on its own and is too large for the link.
16+
var ErrMessageTooLarge = errors.New("the message is too large")
1617

1718
type (
1819
// MessageBatch represents a batch of messages to send to Service Bus in a single message

sdk/messaging/azservicebus/sender.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package azservicebus
55

66
import (
77
"context"
8+
"errors"
89
"time"
910

1011
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
@@ -33,7 +34,7 @@ type MessageBatchOptions struct {
3334
// NewMessageBatch can be used to create a batch that contain multiple
3435
// messages. Sending a batch of messages is more efficient than sending the
3536
// messages one at a time.
36-
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
37+
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
3738
func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptions) (*MessageBatch, error) {
3839
var batch *MessageBatch
3940

@@ -61,7 +62,9 @@ type SendMessageOptions struct {
6162
}
6263

6364
// SendMessage sends a Message to a queue or topic.
64-
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
65+
// If the operation fails it can return:
66+
// - [ErrMessageTooLarge] if the message is larger than the maximum allowed link size.
67+
// - An [*azservicebus.Error] type if the failure is actionable.
6568
func (s *Sender) SendMessage(ctx context.Context, message *Message, options *SendMessageOptions) error {
6669
return s.sendMessage(ctx, message)
6770
}
@@ -74,7 +77,9 @@ type SendAMQPAnnotatedMessageOptions struct {
7477
// SendAMQPAnnotatedMessage sends an AMQPMessage to a queue or topic.
7578
// Using an AMQPMessage allows for advanced use cases, like payload encoding, as well as better
7679
// interoperability with pure AMQP clients.
77-
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
80+
// If the operation fails it can return:
81+
// - [ErrMessageTooLarge] if the message is larger than the maximum allowed link size.
82+
// - An [*azservicebus.Error] type if the failure is actionable.
7883
func (s *Sender) SendAMQPAnnotatedMessage(ctx context.Context, message *AMQPAnnotatedMessage, options *SendAMQPAnnotatedMessageOptions) error {
7984
return s.sendMessage(ctx, message)
8085
}
@@ -171,6 +176,10 @@ func (s *Sender) sendMessage(ctx context.Context, message amqpCompatibleMessage)
171176
return lwid.Sender.Send(ctx, message.toAMQPMessage(), nil)
172177
}, RetryOptions(s.retryOptions))
173178

179+
if amqpErr := (*amqp.Error)(nil); errors.As(err, &amqpErr) && amqpErr.Condition == amqp.ErrCondMessageSizeExceeded {
180+
return ErrMessageTooLarge
181+
}
182+
174183
return internal.TransformError(err)
175184
}
176185

sdk/messaging/azservicebus/sender_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -734,3 +734,47 @@ func (rm receivedMessages) Less(i, j int) bool {
734734
func (rm receivedMessages) Swap(i, j int) {
735735
rm[i], rm[j] = rm[j], rm[i]
736736
}
737+
738+
func Test_Sender_Send_MessageTooBig(t *testing.T) {
739+
client, cleanup, queueName := setupLiveTest(t, &liveTestOptions{
740+
ClientOptions: &ClientOptions{
741+
RetryOptions: RetryOptions{
742+
// This is a purposefully ridiculous wait time but we'll never hit it
743+
// because exceeding the max message size is NOT a retryable error.
744+
RetryDelay: time.Hour,
745+
},
746+
},
747+
QueueProperties: &admin.QueueProperties{
748+
EnablePartitioning: to.Ptr(true),
749+
}})
750+
defer cleanup()
751+
752+
sender, err := client.NewSender(queueName, nil)
753+
require.NoError(t, err)
754+
755+
hugePayload := []byte{}
756+
757+
for i := 0; i < 1000*1000; i++ {
758+
hugePayload = append(hugePayload, 100)
759+
}
760+
761+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
762+
defer cancel()
763+
err = sender.SendMessage(ctx, &Message{
764+
MessageID: to.Ptr("message with a message ID"),
765+
Body: hugePayload,
766+
}, nil)
767+
768+
require.ErrorIs(t, err, ErrMessageTooLarge)
769+
770+
ctx, cancel = context.WithTimeout(context.Background(), time.Minute)
771+
defer cancel()
772+
773+
err = sender.SendAMQPAnnotatedMessage(ctx, &AMQPAnnotatedMessage{
774+
Body: AMQPAnnotatedMessageBody{
775+
Data: [][]byte{hugePayload},
776+
},
777+
}, nil)
778+
779+
require.ErrorIs(t, err, ErrMessageTooLarge)
780+
}

0 commit comments

Comments
 (0)