diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/AsyncStreamPrepender.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/AsyncStreamPrepender.java index bf417217e544..0e0be90eb99b 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/AsyncStreamPrepender.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/AsyncStreamPrepender.java @@ -25,9 +25,6 @@ public class AsyncStreamPrepender implements Publisher { private final Publisher delegate; private final T firstItem; - private Subscriber subscriber; - private volatile boolean complete = false; - private volatile boolean firstRequest = true; public AsyncStreamPrepender(Publisher delegate, T firstItem) { this.delegate = delegate; @@ -36,11 +33,18 @@ public AsyncStreamPrepender(Publisher delegate, T firstItem) { @Override public void subscribe(Subscriber s) { - subscriber = s; - delegate.subscribe(new DelegateSubscriber()); + delegate.subscribe(new DelegateSubscriber(s)); } private class DelegateSubscriber implements Subscriber { + private final Subscriber subscriber; + private volatile boolean complete = false; + private volatile boolean firstRequest = true; + + private DelegateSubscriber(Subscriber subscriber) { + this.subscriber = subscriber; + } + @Override public void onSubscribe(Subscription subscription) { subscriber.onSubscribe(new Subscription() { @@ -90,7 +94,6 @@ public void request(long n) { public void cancel() { cancelled = true; subscription.cancel(); - subscriber = null; } }); } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/AsyncStreamPrependerTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/AsyncStreamPrependerTest.java index 1e1543039b85..16d3a7050be1 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/AsyncStreamPrependerTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/AsyncStreamPrependerTest.java @@ -62,6 +62,21 @@ public void sequence() { } } + @Test + public void multiSubscribe_stillPrepends() { + AsyncStreamPrepender prepender = new AsyncStreamPrepender<>(rangeLong(1L, 5L), 0L); + Flowable prepended1 = fromPublisher(prepender); + Flowable prepended2 = fromPublisher(prepender); + + Iterator iterator1 = prepended1.blockingIterable(1).iterator(); + Iterator iterator2 = prepended2.blockingIterable(1).iterator(); + + for (long i = 0; i <= 5; i++) { + assertEquals(i, iterator1.next().longValue()); + assertEquals(i, iterator2.next().longValue()); + } + } + @Test public void error() { Flowable error = Flowable.error(IllegalStateException::new);