Skip to content

Fix pattern consumer fails when a subscribed topic is deleted #319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/Pulsar.Client/Internal/ConsumerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
let batchWaitingChannel = batchWaiters |> dequeueBatchWaiter
batchWaitingChannel.TrySetException ex |> ignore

let stopConsumer () =
let closeConsumerTasks() =
unAckedMessageTracker.Close()
acksGroupingTracker.Close()
clearDeadLetters()
Expand All @@ -574,6 +574,9 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
statTimer.Stop()
chunkTimer.Stop()
cleanup(this)

let stopConsumer () =
closeConsumerTasks()
failWaiters <| AlreadyClosedException "Consumer is already closed"
Log.Logger.LogInformation("{0} stopped", prefix)

Expand Down Expand Up @@ -878,7 +881,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
// auto-topic-creation set to false
// No more retries are needed in this case.
connectionHandler.Failed()
stopConsumer()
closeConsumerTasks()
Log.Logger.LogWarning("{0} Closed consumer because topic does not exist anymore. {1}", prefix, ex.Message)
continueLoop <- false
| _ ->
Expand Down
67 changes: 67 additions & 0 deletions tests/IntegrationTests/Basic.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ open System
open System.Text.Json
open System.Threading
open System.Diagnostics
open System.Net.Http

open Expecto
open Expecto.Flip

open System.Text
open System.Threading.Tasks
open Microsoft.Extensions.Logging
open Pulsar.Client.Api
open Pulsar.Client.Common
open Serilog
Expand Down Expand Up @@ -373,6 +375,71 @@ let tests =
Expect.isTrue "" isReplicated
Log.Debug("Finished 'Create the replicated subscription should be successful'")
}

testTask "Delete topic subscribed by the pattern consumer should not throw error or recreate topic" {
Log.Debug("Started 'Delete topic subscribed by the pattern consumer should not throw error or recreate topic'")
let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N")
let client = getClient()

let pattern = topicName + "-.*"
let! (producer1 : IProducer<byte[]>) =
client.NewProducer()
.Topic(topicName + "-1")
.CreateAsync()
let! (producer2 : IProducer<byte[]>) =
client.NewProducer()
.Topic(topicName + "-2")
.CreateAsync()

do! producer1.SendAsync([| 0uy |])
do! producer2.SendAsync([| 0uy |])

let! (consumer : IConsumer<byte[]>) =
client.NewConsumer()
.TopicsPattern(pattern)
.ConsumerName("test")
.SubscriptionName("test")
.SubscriptionType(SubscriptionType.Exclusive)
.SubscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.ReplicateSubscriptionState(true)
.SubscribeAsync()

do! producer1.DisposeAsync().AsTask()
do! producer2.DisposeAsync().AsTask()

let task = consumer.ReceiveAsync()

// Check that the task doesn't fail immediately
Expect.isFalse "" task.IsFaulted
Expect.isFalse "" task.IsCanceled

// Delete topic using HTTP request
let deleteUrl = $"{pulsarHttpAddress}/admin/v2/persistent/{topicName}-1?force=true"
let! (response: HttpResponseMessage) = commonHttpClient.DeleteAsync(deleteUrl)
response.EnsureSuccessStatusCode() |> ignore

do! Task.Delay(1000) // This make sure that the topic won't be recreated

// Verify topic is deleted by trying to get stats (should return NotFound)
let statsUrl = $"{pulsarHttpAddress}/admin/v2/persistent/{topicName}-1/stats"
let! (statsResponse: HttpResponseMessage) = commonHttpClient.GetAsync(statsUrl)
Expect.equal "" System.Net.HttpStatusCode.NotFound statsResponse.StatusCode

// Check that the task is running
Expect.isFalse "" task.IsCompleted

let! (producer : IProducer<byte[]>) =
client.NewProducer()
.Topic(topicName + "-2")
.CreateAsync()

do! producer.SendAsync([| 1uy |])

let! (msg : Message<byte[]>) = task

Expect.equal "" [| 1uy |] <| msg.GetValue()
Log.Debug("Finished 'Delete topic subscribed by the pattern consumer should not throw error or recreate topic'")
}

#if !NOTLS
// Before running this test set 'maxMessageSize' for broker and 'nettyMaxFrameSizeBytes' for bookkeeper
Expand Down
Loading