Skip to content

Commit 46b3614

Browse files
[azservicebus] Updating batching to allow for a configurable wait time (#22154)
Updating batching to allow for a configurable wait time. Can lead to fuller batches for people that want to tune it. Fixes #19172
1 parent bdf62de commit 46b3614

File tree

4 files changed

+90
-18
lines changed

4 files changed

+90
-18
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
# Release History
22

3-
## 1.5.1 (Unreleased)
3+
## 1.5.1 (2024-01-16)
44

55
### Features Added
66

7-
### Breaking Changes
8-
9-
### Bugs Fixed
10-
11-
### Other Changes
7+
- ReceiverOptions.TimeAfterFirstMessage lets you configure the amount of time, after the first message in a batch is received, before we return messages. (PR#22154)
128

139
## 1.5.0 (2023-10-10)
1410

sdk/messaging/azservicebus/receiver.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ type Receiver struct {
4747
amqpLinks internal.AMQPLinks
4848
cancelReleaser *atomic.Value
4949
cleanupOnClose func()
50-
defaultTimeAfterFirstMsg time.Duration
5150
entityPath string
5251
lastPeekedSequenceNumber int64
5352
maxAllowedCredits uint32
@@ -131,7 +130,6 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err
131130
receiver := &Receiver{
132131
cancelReleaser: &atomic.Value{},
133132
cleanupOnClose: args.cleanupOnClose,
134-
defaultTimeAfterFirstMsg: 20 * time.Millisecond,
135133
lastPeekedSequenceNumber: 0,
136134
maxAllowedCredits: defaultLinkRxBuffer,
137135
retryOptions: args.retryOptions,
@@ -143,13 +141,6 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err
143141
return nil, err
144142
}
145143

146-
if receiver.receiveMode == ReceiveModeReceiveAndDelete {
147-
// TODO: there appears to be a bit more overhead when receiving messages
148-
// in ReceiveAndDelete. Need to investigate if this is related to our
149-
// auto-accepting logic in go-amqp.
150-
receiver.defaultTimeAfterFirstMsg = time.Second
151-
}
152-
153144
newLinkFn := receiver.newReceiverLink
154145

155146
if args.newLinkFn != nil {
@@ -181,7 +172,13 @@ func (r *Receiver) newReceiverLink(ctx context.Context, session amqpwrap.AMQPSes
181172

182173
// ReceiveMessagesOptions are options for the ReceiveMessages function.
183174
type ReceiveMessagesOptions struct {
184-
// For future expansion
175+
// TimeAfterFirstMessage controls how long, after a message has been received, before we return the
176+
// accumulated batch of messages.
177+
//
178+
// Default value depends on the receive mode:
179+
// - 20ms when the receiver is in ReceiveModePeekLock
180+
// - 1s when the receiver is in ReceiveModeReceiveAndDelete
181+
TimeAfterFirstMessage time.Duration
185182
}
186183

187184
// ReceiveMessages receives a fixed number of messages, up to numMessages.
@@ -400,7 +397,15 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt
400397
r.amqpLinks.Writef(EventReceiver, "Have %d credits, no new credits needed", currentReceiverCredits)
401398
}
402399

403-
result := r.fetchMessages(ctx, linksWithID.Receiver, maxMessages, r.defaultTimeAfterFirstMsg)
400+
timeAfterFirstMessage := 20 * time.Millisecond
401+
402+
if options != nil && options.TimeAfterFirstMessage > 0 {
403+
timeAfterFirstMessage = options.TimeAfterFirstMessage
404+
} else if r.receiveMode == ReceiveModeReceiveAndDelete {
405+
timeAfterFirstMessage = time.Second
406+
}
407+
408+
result := r.fetchMessages(ctx, linksWithID.Receiver, maxMessages, timeAfterFirstMessage)
404409

405410
r.amqpLinks.Writef(EventReceiver, "Received %d/%d messages", len(result.Messages), maxMessages)
406411

sdk/messaging/azservicebus/receiver_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package azservicebus
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910
"regexp"
1011
"sort"
@@ -941,6 +942,77 @@ func TestReceiveAndSendAndReceive(t *testing.T) {
941942
require.Equal(t, msgs[0].Message(), rereceivedMsgs[0].Message(), "all sendable fields are preserved when resending")
942943
}
943944

945+
func TestReceiveWithDifferentWaitTime(t *testing.T) {
946+
setup := func(t *testing.T, timeAfterFirstMessage *time.Duration) int {
947+
serviceBusClient, cleanup, queueName := setupLiveTest(t, nil)
948+
defer cleanup()
949+
950+
sender, err := serviceBusClient.NewSender(queueName, nil)
951+
require.NoError(t, err)
952+
defer sender.Close(context.Background())
953+
954+
batch, err := sender.NewMessageBatch(context.Background(), nil)
955+
require.NoError(t, err)
956+
957+
bigBody := make([]byte, 1000)
958+
959+
// send a bunch of messages
960+
for i := 0; i < 1000; i++ {
961+
err := batch.AddMessage(&Message{
962+
Body: bigBody,
963+
}, nil)
964+
965+
if errors.Is(err, ErrMessageTooLarge) {
966+
err = sender.SendMessageBatch(context.Background(), batch, nil)
967+
require.NoError(t, err)
968+
969+
batch, err = sender.NewMessageBatch(context.Background(), nil)
970+
require.NoError(t, err)
971+
972+
i--
973+
}
974+
}
975+
976+
if batch.NumMessages() > 0 {
977+
err = sender.SendMessageBatch(context.Background(), batch, nil)
978+
require.NoError(t, err)
979+
}
980+
981+
receiver, err := serviceBusClient.NewReceiverForQueue(queueName, nil)
982+
require.NoError(t, err)
983+
984+
var opts *ReceiveMessagesOptions
985+
986+
if timeAfterFirstMessage != nil {
987+
opts = &ReceiveMessagesOptions{
988+
TimeAfterFirstMessage: *timeAfterFirstMessage,
989+
}
990+
991+
t.Logf("Setting time after first message: %s", *timeAfterFirstMessage)
992+
} else {
993+
t.Log("Using default time after first message")
994+
}
995+
996+
messages, err := receiver.ReceiveMessages(context.Background(), 1000, opts)
997+
require.NoError(t, err)
998+
999+
return len(messages)
1000+
}
1001+
1002+
base := setup(t, nil)
1003+
require.NotZero(t, base)
1004+
t.Logf("Base case: %d messages", base)
1005+
1006+
base2 := setup(t, to.Ptr[time.Duration](0))
1007+
require.NotZero(t, base2)
1008+
t.Logf("Base case2: %d messages", base2)
1009+
1010+
bigger := setup(t, to.Ptr(20*time.Second))
1011+
t.Logf("Bigger: %d messages", bigger)
1012+
require.Greater(t, bigger, base)
1013+
require.Greater(t, bigger, base2)
1014+
}
1015+
9441016
type receivedMessageSlice []*ReceivedMessage
9451017

9461018
func (messages receivedMessageSlice) Len() int {

sdk/messaging/azservicebus/receiver_unit_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ func TestReceiverCancellationUnitTests(t *testing.T) {
8282
ctx, cancel := context.WithCancel(context.Background())
8383

8484
r := &Receiver{
85-
defaultTimeAfterFirstMsg: time.Second,
8685
amqpLinks: &internal.FakeAMQPLinks{
8786
Receiver: &internal.FakeAMQPReceiver{
8887
ReceiveFn: func(ctx context.Context) (*amqp.Message, error) {

0 commit comments

Comments
 (0)