Skip to content

Commit d783933

Browse files
Fix ByteString.Copy when requested length is zero (#6749)
* added reproduction for #6748 * fixed zero-copy issues
1 parent 15ab3b1 commit d783933

File tree

3 files changed

+92
-1
lines changed

3 files changed

+92
-1
lines changed

src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs

+25-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class ConsumerControllerSpecs : TestKit.Xunit2.TestKit
2222
}";
2323

2424
public ConsumerControllerSpecs(ITestOutputHelper outputHelper) : base(
25-
Config.WithFallback(TestSerializer.Config), output: outputHelper)
25+
Config.WithFallback(TestSerializer.Config).WithFallback(ZeroLengthSerializer.Config), output: outputHelper)
2626
{
2727
}
2828

@@ -648,4 +648,28 @@ public async Task ConsumerController_without_resending_must_accept_lost_message(
648648
(await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>()).SeqNr.Should().Be(35);
649649
consumerController.Tell(ConsumerController.Confirmed.Instance);
650650
}
651+
652+
/// <summary>
653+
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/6748
654+
/// </summary>
655+
[Fact]
656+
public async Task ConsumerController_can_process_zero_length_Chunk()
657+
{
658+
NextId();
659+
var consumerController = Sys.ActorOf(ConsumerController.Create<ZeroLengthSerializer.TestMsg>(Sys, Option<IActorRef>.None),
660+
$"consumerController-{_idCount}");
661+
var producerControllerProbe = CreateTestProbe();
662+
663+
var consumerProbe = CreateTestProbe();
664+
consumerController.Tell(new ConsumerController.Start<ZeroLengthSerializer.TestMsg>(consumerProbe));
665+
666+
// one chunk for each letter, "123" is 3 chunks
667+
var chunks1 =
668+
ProducerController<ZeroLengthSerializer.TestMsg>.CreateChunks(ZeroLengthSerializer.TestMsg.Instance, chunkSize: 1, Sys.Serialization);
669+
var seqMessages1 = chunks1.Select((c, i) =>
670+
ConsumerController.SequencedMessage<ZeroLengthSerializer.TestMsg>.FromChunkedMessage(ProducerId, 1 + i, c, i == 0, false,
671+
producerControllerProbe)).ToList();
672+
consumerController.Tell(seqMessages1.First());
673+
(await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<ZeroLengthSerializer.TestMsg>>()).Message.Should().Be(ZeroLengthSerializer.TestMsg.Instance);
674+
}
651675
}

src/core/Akka.Tests/Delivery/TestConsumer.cs

+64
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,70 @@ public static Props PropsFor(TimeSpan delay, Func<SomeAsyncJob, bool> endConditi
191191
public ITimerScheduler Timers { get; set; } = null!;
192192
}
193193

194+
/// <summary>
195+
/// For testing purposes
196+
/// </summary>
197+
public sealed class ZeroLengthSerializer : SerializerWithStringManifest
198+
{
199+
public static readonly Config Config = ConfigurationFactory.ParseString(@"
200+
akka.actor {
201+
serializers {
202+
delivery-zero-length = ""Akka.Tests.Delivery.ZeroLengthSerializer, Akka.Tests""
203+
}
204+
serialization-bindings {
205+
""Akka.Tests.Delivery.ZeroLengthSerializer+TestMsg, Akka.Tests"" = delivery-zero-length
206+
}
207+
}");
208+
209+
public class TestMsg
210+
{
211+
private TestMsg()
212+
{
213+
}
214+
public static readonly TestMsg Instance = new();
215+
}
216+
217+
public ZeroLengthSerializer(ExtendedActorSystem system) : base(system)
218+
{
219+
}
220+
221+
public override byte[] ToBinary(object obj)
222+
{
223+
switch (obj)
224+
{
225+
case TestMsg _:
226+
return Array.Empty<byte>();
227+
default:
228+
throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}]");
229+
}
230+
}
231+
232+
public override object FromBinary(byte[] bytes, string manifest)
233+
{
234+
switch (manifest)
235+
{
236+
case "A":
237+
return TestMsg.Instance;
238+
default:
239+
throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifest}] in [{GetType()}]");
240+
}
241+
242+
}
243+
244+
public override string Manifest(object obj)
245+
{
246+
switch (obj)
247+
{
248+
case TestMsg _:
249+
return "A";
250+
default:
251+
throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}]");
252+
}
253+
}
254+
255+
public override int Identifier => 919191;
256+
}
257+
194258
/// <summary>
195259
/// INTERNAL API
196260
/// </summary>

src/core/Akka/Util/ByteString.cs

+3
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ public ByteString Concat(ByteString other)
539539
/// <returns>TBD</returns>
540540
public int CopyTo(byte[] buffer, int index, int count)
541541
{
542+
if(buffer.Length == 0 && count == 0) return 0; // edge case for no-copy
542543
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
543544
if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to.");
544545
if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count));
@@ -575,6 +576,7 @@ public int CopyTo(ref Memory<byte> buffer)
575576
/// <returns>The number of bytes copied</returns>
576577
public int CopyTo(ref Memory<byte> buffer, int index, int count)
577578
{
579+
if(buffer.Length == 0 && count == 0) return 0; // edge case for no-copy
578580
if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to.");
579581
if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count));
580582

@@ -613,6 +615,7 @@ public int CopyTo(ref Span<byte> buffer)
613615
/// <returns>The number of bytes copied</returns>
614616
public int CopyTo(ref Span<byte> buffer, int index, int count)
615617
{
618+
if(buffer.Length == 0 && count == 0) return 0; // edge case for no-copy
616619
if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to.");
617620
if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count));
618621

0 commit comments

Comments
 (0)