Skip to content

Commit f8562cf

Browse files
authored
Fix pattern consumer fails when a subscribed topic is deleted (#319)
* Fix pattern consumer fails when a subscribed topic is deleted. * Improve test * Add log for test
1 parent 0e95ffb commit f8562cf

File tree

2 files changed

+72
-2
lines changed

2 files changed

+72
-2
lines changed

src/Pulsar.Client/Internal/ConsumerImpl.fs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
563563
let batchWaitingChannel = batchWaiters |> dequeueBatchWaiter
564564
batchWaitingChannel.TrySetException ex |> ignore
565565

566-
let stopConsumer () =
566+
let closeConsumerTasks() =
567567
unAckedMessageTracker.Close()
568568
acksGroupingTracker.Close()
569569
clearDeadLetters()
@@ -574,6 +574,9 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
574574
statTimer.Stop()
575575
chunkTimer.Stop()
576576
cleanup(this)
577+
578+
let stopConsumer () =
579+
closeConsumerTasks()
577580
failWaiters <| AlreadyClosedException "Consumer is already closed"
578581
Log.Logger.LogInformation("{0} stopped", prefix)
579582

@@ -897,7 +900,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
897900
// auto-topic-creation set to false
898901
// No more retries are needed in this case.
899902
connectionHandler.Failed()
900-
stopConsumer()
903+
closeConsumerTasks()
901904
Log.Logger.LogWarning("{0} Closed consumer because topic does not exist anymore. {1}", prefix, ex.Message)
902905
continueLoop <- false
903906
| _ ->

tests/IntegrationTests/Basic.fs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ open System
44
open System.Text.Json
55
open System.Threading
66
open System.Diagnostics
7+
open System.Net.Http
78

89
open Expecto
910
open Expecto.Flip
1011

1112
open System.Text
1213
open System.Threading.Tasks
14+
open Microsoft.Extensions.Logging
1315
open Pulsar.Client.Api
1416
open Pulsar.Client.Common
1517
open Serilog
@@ -373,6 +375,71 @@ let tests =
373375
Expect.isTrue "" isReplicated
374376
Log.Debug("Finished 'Create the replicated subscription should be successful'")
375377
}
378+
379+
testTask "Delete topic subscribed by the pattern consumer should not throw error or recreate topic" {
380+
Log.Debug("Started 'Delete topic subscribed by the pattern consumer should not throw error or recreate topic'")
381+
let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N")
382+
let client = getClient()
383+
384+
let pattern = topicName + "-.*"
385+
let! (producer1 : IProducer<byte[]>) =
386+
client.NewProducer()
387+
.Topic(topicName + "-1")
388+
.CreateAsync()
389+
let! (producer2 : IProducer<byte[]>) =
390+
client.NewProducer()
391+
.Topic(topicName + "-2")
392+
.CreateAsync()
393+
394+
do! producer1.SendAsync([| 0uy |])
395+
do! producer2.SendAsync([| 0uy |])
396+
397+
let! (consumer : IConsumer<byte[]>) =
398+
client.NewConsumer()
399+
.TopicsPattern(pattern)
400+
.ConsumerName("test")
401+
.SubscriptionName("test")
402+
.SubscriptionType(SubscriptionType.Exclusive)
403+
.SubscriptionInitialPosition(SubscriptionInitialPosition.Latest)
404+
.ReplicateSubscriptionState(true)
405+
.SubscribeAsync()
406+
407+
do! producer1.DisposeAsync().AsTask()
408+
do! producer2.DisposeAsync().AsTask()
409+
410+
let task = consumer.ReceiveAsync()
411+
412+
// Check that the task doesn't fail immediately
413+
Expect.isFalse "" task.IsFaulted
414+
Expect.isFalse "" task.IsCanceled
415+
416+
// Delete topic using HTTP request
417+
let deleteUrl = $"{pulsarHttpAddress}/admin/v2/persistent/{topicName}-1?force=true"
418+
let! (response: HttpResponseMessage) = commonHttpClient.DeleteAsync(deleteUrl)
419+
response.EnsureSuccessStatusCode() |> ignore
420+
421+
do! Task.Delay(1000) // This make sure that the topic won't be recreated
422+
423+
// Verify topic is deleted by trying to get stats (should return NotFound)
424+
let statsUrl = $"{pulsarHttpAddress}/admin/v2/persistent/{topicName}-1/stats"
425+
let! (statsResponse: HttpResponseMessage) = commonHttpClient.GetAsync(statsUrl)
426+
Expect.equal "" System.Net.HttpStatusCode.NotFound statsResponse.StatusCode
427+
428+
// Check that the task is running
429+
Expect.isFalse "" task.IsCompleted
430+
431+
let! (producer : IProducer<byte[]>) =
432+
client.NewProducer()
433+
.Topic(topicName + "-2")
434+
.CreateAsync()
435+
436+
do! producer.SendAsync([| 1uy |])
437+
438+
let! (msg : Message<byte[]>) = task
439+
440+
Expect.equal "" [| 1uy |] <| msg.GetValue()
441+
Log.Debug("Finished 'Delete topic subscribed by the pattern consumer should not throw error or recreate topic'")
442+
}
376443

377444
#if !NOTLS
378445
// Before running this test set 'maxMessageSize' for broker and 'nettyMaxFrameSizeBytes' for bookkeeper

0 commit comments

Comments
 (0)