Skip to content

Commit 5c7e180

Browse files
authored
Remove message from tableview when value is null (#293)
* Remove message from tableview when value is null * Implement nullvalue * Fix build
1 parent 8f4b289 commit 5c7e180

File tree

9 files changed

+73
-8
lines changed

9 files changed

+73
-8
lines changed

src/Pulsar.Client/Common/DTO.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ type internal Metadata =
213213
EncryptionAlgo: string
214214
OrderingKey: byte[]
215215
ReplicatedFrom: string
216+
NullValue: bool
216217
}
217218

218219
type MessageKey =

src/Pulsar.Client/Internal/BatchMessageContainer.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ module internal BatchHelpers =
4646
if message.Properties.Count > 0 then
4747
for property in message.Properties do
4848
smm.Properties.Add(KeyValue(Key = property.Key, Value = property.Value))
49+
smm.NullValue <- box message.Value |> isNull
4950
Serializer.SerializeWithLengthPrefix(messageStream, smm, PrefixStyle.Fixed32BigEndian)
5051
messageWriter.Write(message.Payload)
5152
struct(BatchDetails(%index, BatchMessageAcker.NullAcker), message, batchItem.Tcs)

src/Pulsar.Client/Internal/ClientCnx.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ and internal ClientCnx (config: PulsarClientConfiguration,
530530
EncryptionAlgo = messageMetadata.EncryptionAlgo
531531
OrderingKey = messageMetadata.OrderingKey
532532
ReplicatedFrom = messageMetadata.ReplicatedFrom
533+
NullValue = messageMetadata.NullValue
533534
}
534535

535536
{

src/Pulsar.Client/Internal/ConsumerImpl.fs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -668,9 +668,12 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
668668
else
669669
let msgKey = rawMessage.MessageKey
670670
let getValue () =
671-
keyValueProcessor
672-
|> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, payload) :?> 'T)
673-
|> Option.defaultWith (fun () -> schemaDecodeFunction payload)
671+
if rawMessage.Metadata.NullValue then
672+
Unchecked.defaultof<'T>
673+
else
674+
keyValueProcessor
675+
|> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, payload) :?> 'T)
676+
|> Option.defaultWith (fun () -> schemaDecodeFunction payload)
674677
let message = Message(
675678
msgId,
676679
payload,
@@ -1330,9 +1333,12 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
13301333
}
13311334
let msgKey = singleMessageMetadata.PartitionKey
13321335
let getValue () =
1333-
keyValueProcessor
1334-
|> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, singleMessagePayload) :?> 'T)
1335-
|> Option.defaultWith (fun() -> schemaDecodeFunction singleMessagePayload)
1336+
if singleMessageMetadata.NullValue then
1337+
Unchecked.defaultof<'T>
1338+
else
1339+
keyValueProcessor
1340+
|> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, singleMessagePayload) :?> 'T)
1341+
|> Option.defaultWith (fun () -> schemaDecodeFunction singleMessagePayload)
13361342
let properties =
13371343
if singleMessageMetadata.Properties.Count > 0 then
13381344
singleMessageMetadata.Properties

src/Pulsar.Client/Internal/ProducerImpl.fs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -857,7 +857,8 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
857857
Some { PartitionKey = %key; IsBase64Encoded = false }
858858
else
859859
Some { PartitionKey = %Convert.ToBase64String(keyBytes); IsBase64Encoded = true }
860-
MessageBuilder(value, schema.Encode(value), keyObj,
860+
let payloay = if box value |> isNull then Array.empty<byte> else schema.Encode(value)
861+
MessageBuilder(value, payloay, keyObj,
861862
?properties0 = (properties |> Option.ofObj),
862863
?deliverAt = (deliverAt |> Option.ofNullable),
863864
?sequenceId = (sequenceId |> Option.ofNullable),

src/Pulsar.Client/Internal/TableViewImpl.fs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ type internal TableViewImpl<'T> private (reader: IReader<'T>) =
1313

1414
member private this.HandleMessage(msg: Message<'T>) =
1515
if not (String.IsNullOrEmpty(%msg.Key)) then
16-
data.AddOrUpdate(%msg.Key, msg.GetValue(), (fun _ _ -> msg.GetValue())) |> ignore
16+
let value = msg.GetValue()
17+
if box value |> isNull then
18+
data.TryRemove(%msg.Key) |> ignore
19+
else
20+
data.AddOrUpdate(%msg.Key, value, (fun _ _ -> value)) |> ignore
1721

1822
member private this.ReadTailMessages(reader: IReader<'T>) =
1923
backgroundTask {

tests/IntegrationTests/Batching.fs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,4 +317,45 @@ let tests =
317317
Log.Debug("Finished 'Second batch is formed well after the first one'")
318318

319319
}
320+
321+
testTask "Null message with batch get sent if batch size exceeds" {
322+
323+
Log.Debug("Started 'Null message with batch get sent if batch size exceeds'")
324+
325+
let client = getClient()
326+
let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N")
327+
let messagesNumber = 5
328+
329+
let! (consumer: IConsumer<byte[]>) =
330+
client.NewConsumer()
331+
.Topic(topicName)
332+
.ConsumerName("batch consumer")
333+
.SubscriptionName("batch-subscription")
334+
.SubscribeAsync()
335+
336+
let! (producer: IProducer<byte[]>) =
337+
client.NewProducer()
338+
.Topic(topicName)
339+
.ProducerName("batch producer")
340+
.EnableBatching(true)
341+
.BatchingMaxMessages(messagesNumber / 2)
342+
.BatchingMaxBytes(100)
343+
.MaxPendingMessages(1)
344+
.BlockIfQueueFull(true)
345+
.CreateAsync()
346+
347+
for i in 0 .. messagesNumber-1 do
348+
producer.SendAsync(producer.NewMessage(null)) |> ignore
349+
350+
for i in 0 .. messagesNumber-1 do
351+
let! (message: Message<byte[]>) = consumer.ReceiveAsync()
352+
match message.MessageId.Type with
353+
| Batch (index, _) ->
354+
Expect.equal $"Run {i} failed" (i % 2) %index
355+
| _ ->
356+
failwith "Expected batch message"
357+
358+
Log.Debug("Finished 'Null message with batch get sent if batch size exceeds'")
359+
360+
}
320361
]

tests/IntegrationTests/TableView.fs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ module Pulsar.Client.IntegrationTests.TableView
22

33
open System
44
open System.Linq
5+
open System.Threading.Tasks
56
open Expecto
67
open Expecto.Flip
78
open Pulsar.Client.Api
@@ -55,6 +56,14 @@ let tests =
5556
let value3 = tableView["key2"]
5657
Expect.sequenceEqual "" [| 2uy |] value2
5758
Expect.sequenceEqual "" [| 3uy |] value3
59+
60+
do! producer.SendAsync(producer.NewMessage(null, "key1"))
61+
62+
do! Task.Delay 2000
63+
64+
Expect.equal "" 1 tableView.Count
65+
let key1NotFound = tableView.ContainsKey("key1")
66+
Expect.equal "" false key1NotFound
5867

5968
Log.Debug("Finished testTableView")
6069
}

tests/UnitTests/Internal/ChunkedMessageTrackerTests.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ let tests =
3131
EventTime = Nullable()
3232
OrderingKey = [||]
3333
ReplicatedFrom = ""
34+
NullValue = false
3435
}
3536

3637
let testRawMessage =

0 commit comments

Comments
 (0)