Skip to content

Commit bf9c1ce

Browse files
Fix: ProducerControllerImpl now respects bounds when chunking large messages (#6755)
* close #6754 - fix chunking bounds check Error was caused by requesting a chunkSize greater than the remaining bytes. Also - changed methods so we no longer copy memory during chunking. * avoid more copying * Update ProducerControllerSpec.cs
1 parent b68ba56 commit bf9c1ce

File tree

2 files changed

+46
-3
lines changed

2 files changed

+46
-3
lines changed

src/core/Akka.Tests/Delivery/ProducerControllerSpec.cs

+43-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
// -----------------------------------------------------------------------
77

88
using System;
9+
using System.Linq;
910
using System.Threading.Tasks;
1011
using Akka.Actor;
1112
using Akka.Configuration;
@@ -453,4 +454,45 @@ public async Task
453454
producerController.Tell(new ProducerController.Request(5L, 15L, false, false));
454455
await replyTo.ExpectMsgAsync(5L);
455456
}
456-
}
457+
458+
/// <summary>
459+
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/6754
460+
/// </summary>
461+
[Fact]
462+
public async Task Repro6754()
463+
{
464+
NextId();
465+
var consumerControllerProbe = CreateTestProbe();
466+
467+
var msg = new string('*', 10_000); // 10k char length string
468+
var producerController =
469+
Sys.ActorOf(
470+
ProducerController.Create<Job>(Sys, ProducerId, Option<Props>.None,
471+
ProducerController.Settings.Create(Sys) with { ChunkLargeMessagesBytes = 1024}),
472+
$"producerController-{_idCount}");
473+
474+
var producerProbe = CreateTestProbe();
475+
producerController.Tell(new ProducerController.Start<Job>(producerProbe.Ref));
476+
477+
producerController.Tell(new ProducerController.RegisterConsumer<Job>(consumerControllerProbe.Ref));
478+
479+
(await producerProbe.ExpectMsgAsync<ProducerController.RequestNext<Job>>())
480+
.SendNextTo.Tell(new Job(msg));
481+
var seqMsg1 = await consumerControllerProbe.ExpectMsgAsync<ConsumerController.SequencedMessage<Job>>();
482+
seqMsg1.Message.IsMessage.Should().BeFalse();
483+
seqMsg1.Message.Chunk.HasValue.Should().BeTrue();
484+
seqMsg1.IsFirstChunk.Should().BeTrue();
485+
seqMsg1.IsLastChunk.Should().BeFalse();
486+
seqMsg1.SeqNr.Should().Be(1);
487+
488+
producerController.Tell(new ProducerController.Request(0L, 10L, true, false));
489+
490+
var seqMsg2 = await consumerControllerProbe.ExpectMsgAsync<ConsumerController.SequencedMessage<Job>>();
491+
seqMsg2.Message.IsMessage.Should().BeFalse();
492+
seqMsg2.Message.Chunk.HasValue.Should().BeTrue();
493+
seqMsg2.IsFirstChunk.Should().BeFalse();
494+
seqMsg2.IsLastChunk.Should().BeFalse();
495+
seqMsg2.SeqNr.Should().Be(2);
496+
497+
}
498+
}

src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ internal static IEnumerable<ChunkedMessage> CreateChunks(T msg, int chunkSize,
747747
var bytes = serialization.Serialize(msg);
748748
if (bytes.Length <= chunkSize)
749749
{
750-
var chunkedMessage = new ChunkedMessage(ByteString.CopyFrom(bytes), true, true, serializerId, manifest);
750+
var chunkedMessage = new ChunkedMessage(ByteString.FromBytes(bytes), true, true, serializerId, manifest);
751751
yield return chunkedMessage;
752752
}
753753
else
@@ -757,7 +757,8 @@ internal static IEnumerable<ChunkedMessage> CreateChunks(T msg, int chunkSize,
757757
for (var i = 0; i < chunkCount; i++)
758758
{
759759
var isLast = i == chunkCount - 1;
760-
var chunkedMessage = new ChunkedMessage(ByteString.CopyFrom(bytes, i * chunkSize, chunkSize), first,
760+
var nextChunk = Math.Min(chunkSize, bytes.Length - i * chunkSize); // needs to be the next chunkSize or remaining bytes, whichever is smaller.
761+
var chunkedMessage = new ChunkedMessage(ByteString.FromBytes(bytes, i * chunkSize, nextChunk), first,
761762
isLast, serializerId, manifest);
762763

763764
first = false;

0 commit comments

Comments
 (0)