diff --git a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs index 76db9b76a6b..a0d17bc8861 100644 --- a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs @@ -185,25 +185,17 @@ public async Task MergeHub_must_work_with_long_streams_when_buffer_size_is_1() { await this.AssertAllStagesStoppedAsync(async () => { - var (sink, probe) = MergeHub.Source(1) - .ToMaterialized(this.SinkProbe(), Keep.Both) + var (sink, result) = MergeHub.Source(1) + .Take(20000) + .ToMaterialized(Sink.Seq(), Keep.Both) .Run(Materializer); - + + Source.From(Enumerable.Range(1, 10000)).RunWith(sink, Materializer); Source.From(Enumerable.Range(10001, 10000)).RunWith(sink, Materializer); - - await probe.RequestAsync(int.MaxValue); - var result = new List(); - foreach (var i in Enumerable.Range(1, 20000)) - { - var evt = await probe.ExpectEventAsync(); - if (evt is TestSubscriber.OnNext next) - result.Add(next.Element); - else - throw new Exception($"For element [{i}]: Expected OnNext but received {evt.GetType()}"); - } - result.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000)); - }, Materializer, 300.Seconds()); + + (await result).OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000)); + }, Materializer, 3.Seconds()); } [Fact] diff --git a/src/core/Akka.Streams/Dsl/Hub.cs b/src/core/Akka.Streams/Dsl/Hub.cs index 32505ee03c1..b4727239bf8 100644 --- a/src/core/Akka.Streams/Dsl/Hub.cs +++ b/src/core/Akka.Streams/Dsl/Hub.cs @@ -86,7 +86,7 @@ public ProducerFailed(string message, Exception cause) : base(message, cause) /// /// INTERNAL API /// - /// TBD + /// The type of element emitted by the MergeHub internal class MergeHub : GraphStageWithMaterializedValue, Sink> { #region Internal classes @@ -426,14 +426,14 @@ public HubSink(MergeHub hub, AtomicCounterLong idCounter, HubLogic logic) /// TBD /// /// TBD - /// - /// This exception is thrown when the specified is less than or equal to zero. + /// + /// This exception is thrown when the specified is less than or equal to zero. /// public MergeHub(int perProducerBufferSize) { if (perProducerBufferSize <= 0) - throw new ArgumentException("Buffer size must be positive", nameof(perProducerBufferSize)); - + throw new ArgumentOutOfRangeException(nameof(perProducerBufferSize), perProducerBufferSize, "Buffer size must be positive"); + _perProducerBufferSize = perProducerBufferSize; DemandThreshold = perProducerBufferSize / 2 + perProducerBufferSize % 2; Shape = new SourceShape(Out);