Skip to content

Commit 04b463d

Browse files
[azeventhubs] Latest start position can also be inclusive (ie, get the latest message) (#20744)
1 parent e1a6152 commit 04b463d

File tree

3 files changed

+28
-24
lines changed

3 files changed

+28
-24
lines changed

sdk/messaging/azeventhubs/consumer_client_unit_test.go

+17-13
Original file line numberDiff line numberDiff line change
@@ -49,42 +49,46 @@ func TestUnitNewConsumerClient(t *testing.T) {
4949

5050
func TestUnit_getOffsetExpression(t *testing.T) {
5151
t.Run("Valid", func(t *testing.T) {
52-
expr, err := getOffsetExpression(StartPosition{})
52+
expr, err := getStartExpression(StartPosition{})
5353
require.NoError(t, err)
5454
require.Equal(t, "amqp.annotation.x-opt-offset > '@latest'", expr)
5555

56-
expr, err = getOffsetExpression(StartPosition{Earliest: to.Ptr(true)})
56+
expr, err = getStartExpression(StartPosition{Earliest: to.Ptr(true)})
5757
require.NoError(t, err)
5858
require.Equal(t, "amqp.annotation.x-opt-offset > '-1'", expr)
5959

60-
expr, err = getOffsetExpression(StartPosition{Latest: to.Ptr(true)})
60+
expr, err = getStartExpression(StartPosition{Latest: to.Ptr(true)})
6161
require.NoError(t, err)
6262
require.Equal(t, "amqp.annotation.x-opt-offset > '@latest'", expr)
6363

64-
expr, err = getOffsetExpression(StartPosition{Offset: to.Ptr(int64(101))})
64+
expr, err = getStartExpression(StartPosition{Latest: to.Ptr(true), Inclusive: true})
65+
require.NoError(t, err)
66+
require.Equal(t, "amqp.annotation.x-opt-offset >= '@latest'", expr)
67+
68+
expr, err = getStartExpression(StartPosition{Offset: to.Ptr(int64(101))})
6569
require.NoError(t, err)
6670
require.Equal(t, "amqp.annotation.x-opt-offset > '101'", expr)
6771

68-
expr, err = getOffsetExpression(StartPosition{Offset: to.Ptr(int64(101)), Inclusive: true})
72+
expr, err = getStartExpression(StartPosition{Offset: to.Ptr(int64(101)), Inclusive: true})
6973
require.NoError(t, err)
7074
require.Equal(t, "amqp.annotation.x-opt-offset >= '101'", expr)
7175

72-
expr, err = getOffsetExpression(StartPosition{SequenceNumber: to.Ptr(int64(202))})
76+
expr, err = getStartExpression(StartPosition{SequenceNumber: to.Ptr(int64(202))})
7377
require.NoError(t, err)
7478
require.Equal(t, "amqp.annotation.x-opt-sequence-number > '202'", expr)
7579

76-
expr, err = getOffsetExpression(StartPosition{SequenceNumber: to.Ptr(int64(202)), Inclusive: true})
80+
expr, err = getStartExpression(StartPosition{SequenceNumber: to.Ptr(int64(202)), Inclusive: true})
7781
require.NoError(t, err)
7882
require.Equal(t, "amqp.annotation.x-opt-sequence-number >= '202'", expr)
7983

8084
enqueueTime, err := time.Parse(time.RFC3339, "2020-01-01T01:02:03Z")
8185
require.NoError(t, err)
8286

83-
expr, err = getOffsetExpression(StartPosition{EnqueuedTime: &enqueueTime})
87+
expr, err = getStartExpression(StartPosition{EnqueuedTime: &enqueueTime})
8488
require.NoError(t, err)
8589
require.Equal(t, "amqp.annotation.x-opt-enqueued-time > '1577840523000'", expr)
8690

87-
expr, err = getOffsetExpression(StartPosition{EnqueuedTime: &enqueueTime, Inclusive: true})
91+
expr, err = getStartExpression(StartPosition{EnqueuedTime: &enqueueTime, Inclusive: true})
8892
require.NoError(t, err)
8993
require.Equal(t, "amqp.annotation.x-opt-enqueued-time >= '1577840523000'", expr)
9094
})
@@ -93,28 +97,28 @@ func TestUnit_getOffsetExpression(t *testing.T) {
9397
enqueueTime, err := time.Parse(time.RFC3339, "2020-01-01T01:02:03Z")
9498
require.NoError(t, err)
9599

96-
expr, err := getOffsetExpression(StartPosition{
100+
expr, err := getStartExpression(StartPosition{
97101
EnqueuedTime: &enqueueTime,
98102
Offset: to.Ptr[int64](101),
99103
})
100104
require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
101105
require.Empty(t, expr)
102106

103-
expr, err = getOffsetExpression(StartPosition{
107+
expr, err = getStartExpression(StartPosition{
104108
Offset: to.Ptr[int64](202),
105109
Latest: to.Ptr(true),
106110
})
107111
require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
108112
require.Empty(t, expr)
109113

110-
expr, err = getOffsetExpression(StartPosition{
114+
expr, err = getStartExpression(StartPosition{
111115
Latest: to.Ptr(true),
112116
SequenceNumber: to.Ptr[int64](202),
113117
})
114118
require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
115119
require.Empty(t, expr)
116120

117-
expr, err = getOffsetExpression(StartPosition{
121+
expr, err = getStartExpression(StartPosition{
118122
SequenceNumber: to.Ptr[int64](202),
119123
Earliest: to.Ptr(true),
120124
})

sdk/messaging/azeventhubs/partition_client.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func (pc *PartitionClient) ReceiveEvents(ctx context.Context, count int, options
184184
numEvents := len(events)
185185
lastSequenceNumber := events[numEvents-1].SequenceNumber
186186

187-
pc.offsetExpression = formatOffsetExpressionForSequence(">", lastSequenceNumber)
187+
pc.offsetExpression = formatStartExpressionForSequence(">", lastSequenceNumber)
188188
log.Writef(EventConsumer, "%d Events received, moving sequence to %d", numEvents, lastSequenceNumber)
189189
return events, nil
190190
}
@@ -274,7 +274,7 @@ func newPartitionClient(args partitionClientArgs, options *PartitionClientOption
274274
options = &PartitionClientOptions{}
275275
}
276276

277-
offsetExpr, err := getOffsetExpression(options.StartPosition)
277+
offsetExpr, err := getStartExpression(options.StartPosition)
278278

279279
if err != nil {
280280
return nil, err
@@ -317,11 +317,11 @@ func getAllPrefetched(receiver amqpwrap.AMQPReceiver, max int) []*amqp.Message {
317317
return messages
318318
}
319319

320-
func getOffsetExpression(startPosition StartPosition) (string, error) {
321-
lt := ">"
320+
func getStartExpression(startPosition StartPosition) (string, error) {
321+
gt := ">"
322322

323323
if startPosition.Inclusive {
324-
lt = ">="
324+
gt = ">="
325325
}
326326

327327
var errMultipleFieldsSet = errors.New("only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
@@ -330,7 +330,7 @@ func getOffsetExpression(startPosition StartPosition) (string, error) {
330330

331331
if startPosition.EnqueuedTime != nil {
332332
// time-based, non-inclusive
333-
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%d'", lt, startPosition.EnqueuedTime.UnixMilli())
333+
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%d'", gt, startPosition.EnqueuedTime.UnixMilli())
334334
}
335335

336336
if startPosition.Offset != nil {
@@ -340,23 +340,23 @@ func getOffsetExpression(startPosition StartPosition) (string, error) {
340340
return "", errMultipleFieldsSet
341341
}
342342

343-
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '%d'", lt, *startPosition.Offset)
343+
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '%d'", gt, *startPosition.Offset)
344344
}
345345

346346
if startPosition.Latest != nil && *startPosition.Latest {
347347
if offsetExpr != "" {
348348
return "", errMultipleFieldsSet
349349
}
350350

351-
offsetExpr = "amqp.annotation.x-opt-offset > '@latest'"
351+
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '@latest'", gt)
352352
}
353353

354354
if startPosition.SequenceNumber != nil {
355355
if offsetExpr != "" {
356356
return "", errMultipleFieldsSet
357357
}
358358

359-
offsetExpr = formatOffsetExpressionForSequence(lt, *startPosition.SequenceNumber)
359+
offsetExpr = formatStartExpressionForSequence(gt, *startPosition.SequenceNumber)
360360
}
361361

362362
if startPosition.Earliest != nil && *startPosition.Earliest {
@@ -375,6 +375,6 @@ func getOffsetExpression(startPosition StartPosition) (string, error) {
375375
return "amqp.annotation.x-opt-offset > '@latest'", nil
376376
}
377377

378-
func formatOffsetExpressionForSequence(op string, sequenceNumber int64) string {
378+
func formatStartExpressionForSequence(op string, sequenceNumber int64) string {
379379
return fmt.Sprintf("amqp.annotation.x-opt-sequence-number %s '%d'", op, sequenceNumber)
380380
}

sdk/messaging/azeventhubs/processor_unit_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func TestUnit_Processor_Run_startPosition(t *testing.T) {
236236
fakeConsumerClient := simpleFakeConsumerClient()
237237

238238
fakeConsumerClient.newPartitionClientFn = func(partitionID string, options *PartitionClientOptions) (*PartitionClient, error) {
239-
offsetExpr, err := getOffsetExpression(options.StartPosition)
239+
offsetExpr, err := getStartExpression(options.StartPosition)
240240
require.NoError(t, err)
241241

242242
return newFakePartitionClient(partitionID, offsetExpr), nil

0 commit comments

Comments
 (0)