Skip to content

Commit 0a8e843

Browse files
fix MergeHub_must_work_with_long_streams_when_buffer_size_is_1 (#6564)
* fix MergeHub_must_work_with_long_streams_when_buffer_size_is_1 99% sure that trying to ReSharper this into a method that doesn't use tail recursion actually created a bug here that can occur for really small partitions - making the `firstAttempt` value mutable across all possible instances results in behavior that actually isn't comparable to using isolated function invocations in all instances. Going to run this through CI and see if that adds up. * Revert "fix MergeHub_must_work_with_long_streams_when_buffer_size_is_1" This reverts commit 3bc0b46. * tremendously simplified `MergeHub_must_work_with_long_streams_when_buffer_size_is_1`
1 parent d1299d0 commit 0a8e843

File tree

2 files changed

+13
-21
lines changed

2 files changed

+13
-21
lines changed

src/core/Akka.Streams.Tests/Dsl/HubSpec.cs

+8-16
Original file line numberDiff line numberDiff line change
@@ -185,25 +185,17 @@ public async Task MergeHub_must_work_with_long_streams_when_buffer_size_is_1()
185185
{
186186
await this.AssertAllStagesStoppedAsync(async () =>
187187
{
188-
var (sink, probe) = MergeHub.Source<int>(1)
189-
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
188+
var (sink, result) = MergeHub.Source<int>(1)
189+
.Take(20000)
190+
.ToMaterialized(Sink.Seq<int>(), Keep.Both)
190191
.Run(Materializer);
191-
192+
193+
192194
Source.From(Enumerable.Range(1, 10000)).RunWith(sink, Materializer);
193195
Source.From(Enumerable.Range(10001, 10000)).RunWith(sink, Materializer);
194-
195-
await probe.RequestAsync(int.MaxValue);
196-
var result = new List<int>();
197-
foreach (var i in Enumerable.Range(1, 20000))
198-
{
199-
var evt = await probe.ExpectEventAsync();
200-
if (evt is TestSubscriber.OnNext<int> next)
201-
result.Add(next.Element);
202-
else
203-
throw new Exception($"For element [{i}]: Expected OnNext<int> but received {evt.GetType()}");
204-
}
205-
result.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000));
206-
}, Materializer, 300.Seconds());
196+
197+
(await result).OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000));
198+
}, Materializer, 3.Seconds());
207199
}
208200

209201
[Fact]

src/core/Akka.Streams/Dsl/Hub.cs

+5-5
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public ProducerFailed(string message, Exception cause) : base(message, cause)
8686
/// <summary>
8787
/// INTERNAL API
8888
/// </summary>
89-
/// <typeparam name="T">TBD</typeparam>
89+
/// <typeparam name="T">The type of element emitted by the MergeHub</typeparam>
9090
internal class MergeHub<T> : GraphStageWithMaterializedValue<SourceShape<T>, Sink<T, NotUsed>>
9191
{
9292
#region Internal classes
@@ -426,14 +426,14 @@ public HubSink(MergeHub<T> hub, AtomicCounterLong idCounter, HubLogic logic)
426426
/// TBD
427427
/// </summary>
428428
/// <param name="perProducerBufferSize">TBD</param>
429-
/// <exception cref="ArgumentException">
430-
/// This exception is thrown when the specified <paramref name="perProducerBufferSize"/> is less than or equal to zero.
429+
/// <exception cref="ArgumentOutOfRangeException">
430+
/// This exception is thrown when the specified <paramref name="perProducerBufferSize"/>is less than or equal to zero.
431431
/// </exception>
432432
public MergeHub(int perProducerBufferSize)
433433
{
434434
if (perProducerBufferSize <= 0)
435-
throw new ArgumentException("Buffer size must be positive", nameof(perProducerBufferSize));
436-
435+
throw new ArgumentOutOfRangeException(nameof(perProducerBufferSize), perProducerBufferSize, "Buffer size must be positive");
436+
437437
_perProducerBufferSize = perProducerBufferSize;
438438
DemandThreshold = perProducerBufferSize / 2 + perProducerBufferSize % 2;
439439
Shape = new SourceShape<T>(Out);

0 commit comments

Comments
 (0)