diff --git a/src/Pulsar.Client/Internal/ConsumerImpl.fs b/src/Pulsar.Client/Internal/ConsumerImpl.fs index 6c13f8a9..6c4140ec 100644 --- a/src/Pulsar.Client/Internal/ConsumerImpl.fs +++ b/src/Pulsar.Client/Internal/ConsumerImpl.fs @@ -563,7 +563,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien let batchWaitingChannel = batchWaiters |> dequeueBatchWaiter batchWaitingChannel.TrySetException ex |> ignore - let stopConsumer () = + let closeConsumerTasks() = unAckedMessageTracker.Close() acksGroupingTracker.Close() clearDeadLetters() @@ -574,6 +574,9 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien statTimer.Stop() chunkTimer.Stop() cleanup(this) + + let stopConsumer () = + closeConsumerTasks() failWaiters <| AlreadyClosedException "Consumer is already closed" Log.Logger.LogInformation("{0} stopped", prefix) @@ -878,7 +881,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien // auto-topic-creation set to false // No more retries are needed in this case. connectionHandler.Failed() - stopConsumer() + closeConsumerTasks() Log.Logger.LogWarning("{0} Closed consumer because topic does not exist anymore. {1}", prefix, ex.Message) continueLoop <- false | _ -> diff --git a/tests/IntegrationTests/Basic.fs b/tests/IntegrationTests/Basic.fs index 25bb7046..acd98852 100644 --- a/tests/IntegrationTests/Basic.fs +++ b/tests/IntegrationTests/Basic.fs @@ -4,12 +4,14 @@ open System open System.Text.Json open System.Threading open System.Diagnostics +open System.Net.Http open Expecto open Expecto.Flip open System.Text open System.Threading.Tasks +open Microsoft.Extensions.Logging open Pulsar.Client.Api open Pulsar.Client.Common open Serilog @@ -373,6 +375,71 @@ let tests = Expect.isTrue "" isReplicated Log.Debug("Finished 'Create the replicated subscription should be successful'") } + + testTask "Delete topic subscribed by the pattern consumer should not throw error or recreate topic" { + Log.Debug("Started 'Delete topic subscribed by the pattern consumer should not throw error or recreate topic'") + let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N") + let client = getClient() + + let pattern = topicName + "-.*" + let! (producer1 : IProducer) = + client.NewProducer() + .Topic(topicName + "-1") + .CreateAsync() + let! (producer2 : IProducer) = + client.NewProducer() + .Topic(topicName + "-2") + .CreateAsync() + + do! producer1.SendAsync([| 0uy |]) + do! producer2.SendAsync([| 0uy |]) + + let! (consumer : IConsumer) = + client.NewConsumer() + .TopicsPattern(pattern) + .ConsumerName("test") + .SubscriptionName("test") + .SubscriptionType(SubscriptionType.Exclusive) + .SubscriptionInitialPosition(SubscriptionInitialPosition.Latest) + .ReplicateSubscriptionState(true) + .SubscribeAsync() + + do! producer1.DisposeAsync().AsTask() + do! producer2.DisposeAsync().AsTask() + + let task = consumer.ReceiveAsync() + + // Check that the task doesn't fail immediately + Expect.isFalse "" task.IsFaulted + Expect.isFalse "" task.IsCanceled + + // Delete topic using HTTP request + let deleteUrl = $"{pulsarHttpAddress}/admin/v2/persistent/{topicName}-1?force=true" + let! (response: HttpResponseMessage) = commonHttpClient.DeleteAsync(deleteUrl) + response.EnsureSuccessStatusCode() |> ignore + + do! Task.Delay(1000) // This make sure that the topic won't be recreated + + // Verify topic is deleted by trying to get stats (should return NotFound) + let statsUrl = $"{pulsarHttpAddress}/admin/v2/persistent/{topicName}-1/stats" + let! (statsResponse: HttpResponseMessage) = commonHttpClient.GetAsync(statsUrl) + Expect.equal "" System.Net.HttpStatusCode.NotFound statsResponse.StatusCode + + // Check that the task is running + Expect.isFalse "" task.IsCompleted + + let! (producer : IProducer) = + client.NewProducer() + .Topic(topicName + "-2") + .CreateAsync() + + do! producer.SendAsync([| 1uy |]) + + let! (msg : Message) = task + + Expect.equal "" [| 1uy |] <| msg.GetValue() + Log.Debug("Finished 'Delete topic subscribed by the pattern consumer should not throw error or recreate topic'") + } #if !NOTLS // Before running this test set 'maxMessageSize' for broker and 'nettyMaxFrameSizeBytes' for bookkeeper