@@ -19,7 +19,6 @@ package pulsar
19
19
20
20
import (
21
21
"context"
22
- "errors"
23
22
"fmt"
24
23
"math/rand"
25
24
"strconv"
@@ -37,9 +36,9 @@ const defaultNackRedeliveryDelay = 1 * time.Minute
37
36
38
37
type acker interface {
39
38
// AckID does not handle errors returned by the Broker side, so no need to wait for doneCh to finish.
40
- AckID (id trackingMessageID ) error
41
- AckIDWithResponse (id trackingMessageID ) error
42
- NackID (id trackingMessageID )
39
+ AckID (id MessageID ) error
40
+ AckIDWithResponse (id MessageID ) error
41
+ NackID (id MessageID )
43
42
NackMsg (msg Message )
44
43
}
45
44
@@ -93,6 +92,14 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
93
92
}
94
93
}
95
94
95
+ if options .MaxPendingChunkedMessage == 0 {
96
+ options .MaxPendingChunkedMessage = 100
97
+ }
98
+
99
+ if options .ExpireTimeOfIncompleteChunk == 0 {
100
+ options .ExpireTimeOfIncompleteChunk = time .Minute
101
+ }
102
+
96
103
if options .NackBackoffPolicy == nil && options .EnableDefaultNackBackoffPolicy {
97
104
options .NackBackoffPolicy = new (defaultNackBackoffPolicy )
98
105
}
@@ -344,28 +351,31 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
344
351
nackRedeliveryDelay = c .options .NackRedeliveryDelay
345
352
}
346
353
opts := & partitionConsumerOpts {
347
- topic : pt ,
348
- consumerName : c .consumerName ,
349
- subscription : c .options .SubscriptionName ,
350
- subscriptionType : c .options .Type ,
351
- subscriptionInitPos : c .options .SubscriptionInitialPosition ,
352
- partitionIdx : idx ,
353
- receiverQueueSize : receiverQueueSize ,
354
- nackRedeliveryDelay : nackRedeliveryDelay ,
355
- nackBackoffPolicy : c .options .NackBackoffPolicy ,
356
- metadata : metadata ,
357
- subProperties : subProperties ,
358
- replicateSubscriptionState : c .options .ReplicateSubscriptionState ,
359
- startMessageID : trackingMessageID {},
360
- subscriptionMode : durable ,
361
- readCompacted : c .options .ReadCompacted ,
362
- interceptors : c .options .Interceptors ,
363
- maxReconnectToBroker : c .options .MaxReconnectToBroker ,
364
- backoffPolicy : c .options .BackoffPolicy ,
365
- keySharedPolicy : c .options .KeySharedPolicy ,
366
- schema : c .options .Schema ,
367
- decryption : c .options .Decryption ,
368
- ackWithResponse : c .options .AckWithResponse ,
354
+ topic : pt ,
355
+ consumerName : c .consumerName ,
356
+ subscription : c .options .SubscriptionName ,
357
+ subscriptionType : c .options .Type ,
358
+ subscriptionInitPos : c .options .SubscriptionInitialPosition ,
359
+ partitionIdx : idx ,
360
+ receiverQueueSize : receiverQueueSize ,
361
+ nackRedeliveryDelay : nackRedeliveryDelay ,
362
+ nackBackoffPolicy : c .options .NackBackoffPolicy ,
363
+ metadata : metadata ,
364
+ subProperties : subProperties ,
365
+ replicateSubscriptionState : c .options .ReplicateSubscriptionState ,
366
+ startMessageID : trackingMessageID {},
367
+ subscriptionMode : durable ,
368
+ readCompacted : c .options .ReadCompacted ,
369
+ interceptors : c .options .Interceptors ,
370
+ maxReconnectToBroker : c .options .MaxReconnectToBroker ,
371
+ backoffPolicy : c .options .BackoffPolicy ,
372
+ keySharedPolicy : c .options .KeySharedPolicy ,
373
+ schema : c .options .Schema ,
374
+ decryption : c .options .Decryption ,
375
+ ackWithResponse : c .options .AckWithResponse ,
376
+ maxPendingChunkedMessage : c .options .MaxPendingChunkedMessage ,
377
+ expireTimeOfIncompleteChunk : c .options .ExpireTimeOfIncompleteChunk ,
378
+ autoAckIncompleteChunk : c .options .AutoAckIncompleteChunk ,
369
379
}
370
380
cons , err := newPartitionConsumer (c , c .client , opts , c .messageCh , c .dlq , c .metrics )
371
381
ch <- ConsumerError {
@@ -456,20 +466,15 @@ func (c *consumer) Ack(msg Message) error {
456
466
457
467
// AckID the consumption of a single message, identified by its MessageID
458
468
func (c * consumer ) AckID (msgID MessageID ) error {
459
- mid , ok := c .messageID (msgID )
460
- if ! ok {
461
- return errors .New ("failed to convert trackingMessageID" )
462
- }
463
-
464
- if mid .consumer != nil {
465
- return mid .Ack ()
469
+ if err := c .checkMsgIDPartition (msgID ); err != nil {
470
+ return err
466
471
}
467
472
468
473
if c .options .AckWithResponse {
469
- return c .consumers [mid . partitionIdx ].AckIDWithResponse (mid )
474
+ return c .consumers [msgID . PartitionIdx () ].AckIDWithResponse (msgID )
470
475
}
471
476
472
- return c .consumers [mid . partitionIdx ].AckID (mid )
477
+ return c .consumers [msgID . PartitionIdx () ].AckID (msgID )
473
478
}
474
479
475
480
// ReconsumeLater mark a message for redelivery after custom delay
@@ -529,7 +534,7 @@ func (c *consumer) Nack(msg Message) {
529
534
}
530
535
531
536
if mid .consumer != nil {
532
- mid .Nack ( )
537
+ mid .consumer . NackID ( msg . ID () )
533
538
return
534
539
}
535
540
c .consumers [mid .partitionIdx ].NackMsg (msg )
@@ -540,17 +545,11 @@ func (c *consumer) Nack(msg Message) {
540
545
}
541
546
542
547
func (c * consumer ) NackID (msgID MessageID ) {
543
- mid , ok := c .messageID (msgID )
544
- if ! ok {
545
- return
546
- }
547
-
548
- if mid .consumer != nil {
549
- mid .Nack ()
548
+ if err := c .checkMsgIDPartition (msgID ); err != nil {
550
549
return
551
550
}
552
551
553
- c .consumers [mid . partitionIdx ].NackID (mid )
552
+ c .consumers [msgID . PartitionIdx () ].NackID (msgID )
554
553
}
555
554
556
555
func (c * consumer ) Close () {
@@ -586,12 +585,11 @@ func (c *consumer) Seek(msgID MessageID) error {
586
585
return newError (SeekFailed , "for partition topic, seek command should perform on the individual partitions" )
587
586
}
588
587
589
- mid , ok := c .messageID (msgID )
590
- if ! ok {
591
- return nil
588
+ if err := c .checkMsgIDPartition (msgID ); err != nil {
589
+ return err
592
590
}
593
591
594
- return c .consumers [mid . partitionIdx ].Seek (mid )
592
+ return c .consumers [msgID . PartitionIdx () ].Seek (msgID )
595
593
}
596
594
597
595
func (c * consumer ) SeekByTime (time time.Time ) error {
@@ -608,6 +606,17 @@ func (c *consumer) SeekByTime(time time.Time) error {
608
606
return errs
609
607
}
610
608
609
+ func (c * consumer ) checkMsgIDPartition (msgID MessageID ) error {
610
+ partition := msgID .PartitionIdx ()
611
+ if partition < 0 || int (partition ) >= len (c .consumers ) {
612
+ c .log .Errorf ("invalid partition index %d expected a partition between [0-%d]" ,
613
+ partition , len (c .consumers ))
614
+ return fmt .Errorf ("invalid partition index %d expected a partition between [0-%d]" ,
615
+ partition , len (c .consumers ))
616
+ }
617
+ return nil
618
+ }
619
+
611
620
var r = & random {
612
621
R : rand .New (rand .NewSource (time .Now ().UnixNano ())),
613
622
}
0 commit comments