Skip to content
This repository was archived by the owner on Feb 10, 2025. It is now read-only.

Drop checks for unsound null values #189

Merged
merged 2 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions lib/src/aggregate_sample.dart
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ extension AggregateSample<T> on Stream<T> {
} else {
triggerSub!.pause();
Copy link
Contributor

@lrhn lrhn Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pause happens only if the current stream is broadcast, and the trigger stream is not.

If this stream is a broadcast stream, then its emissions shouldn't depend on whether it has subscribers or not,
and I'd not pause the triggerSub here. Or valueSub for that matter.
There should be no difference for someone who listens, cancels and listens again whethere there are other listeners at the same time. Which means that even if both streams are broadcast streams, cancelling and resubscribing isn't the same as staying subscribed, because the state can depend on all the prior value events and on whether the last event was at trigger or a value.

If this stream is not broadcast, then nothing ever needs to happen after cancel, and we should cancel triggerSub too, so that's fine.

I'd make this:

controler.onCancel = () {
  if (!isBroadcast) {
    if (isValueDone) {
      if (isTriggerDone) return null;
      return triggerSub.cancel();
    } 
    if (isTriggerDone) return triggerSub!.cancel();
    return [valueSub!.cancel(), triggerSub!.cancel()].wait.then((_) => null);
  }
};

(Probably no need to set the ...Sub variables to null. Nothing holds on to a cancelled completer.)

Then onListen should not assume that it starts from scratch if it's called a second time on a broadcast stream. It should probably just do nothing the second time.

Also noticed: onEmpty should probably not be required if longPoll: true makes it not be used.
Making it optional, and then not emitting anything on a longPoll:false empty trigger would be consistent. You get a value when you ask for one, if there is one.
Or make longPoll the behavior when onEmpty == null;, instead of asking for it and then also passing an unusable onEmpty.
Or keep onEmpty required, and use it if a longPoll is active when the values stream closes.

Copy link
Contributor

@lrhn lrhn Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Just for the heck of it, here is a complete rewrite: https://dartpad.dev/?id=c834bfd326ab1341903145bb9e32bfa7
It moves almost everything inside onListen, which avoids some nullable types, and it wraps user function calls in try/catch.)
Completely untested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't pass the tests as-is, but there could be tests which are overly coupled to the particular async event interleaving. Maybe file this as an issue to consider?

}
// Handle opt-out nulls
cancels.removeWhere((Object? f) => f == null);
if (cancels.isEmpty) return null;
return cancels.wait.then((_) => null);
};
Expand Down
5 changes: 1 addition & 4 deletions lib/src/async_expand.dart
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,7 @@ extension AsyncExpand<T> on Stream<T> {
}
controller.onCancel = () {
if (subscriptions.isEmpty) return null;
var cancels = [for (var s in subscriptions) s.cancel()]
// Handle opt-out nulls
..removeWhere((Object? f) => f == null);
return cancels.wait.then((_) => null);
return [for (var s in subscriptions) s.cancel()].wait.then((_) => null);
};
};
return controller.stream;
Expand Down
10 changes: 2 additions & 8 deletions lib/src/combine_latest.dart
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ extension CombineLatest<T> on Stream<T> {
var cancels = [
sourceSubscription!.cancel(),
otherSubscription!.cancel()
]
// Handle opt-out nulls
..removeWhere((Object? f) => f == null);
];
sourceSubscription = null;
otherSubscription = null;
return cancels.wait.then((_) => null);
Expand Down Expand Up @@ -230,11 +228,7 @@ extension CombineLatest<T> on Stream<T> {
}
controller.onCancel = () {
if (subscriptions.isEmpty) return null;
var cancels = [for (var s in subscriptions) s.cancel()]
// Handle opt-out nulls
..removeWhere((Object? f) => f == null);
if (cancels.isEmpty) return null;
return cancels.wait.then((_) => null);
return [for (var s in subscriptions) s.cancel()].wait.then((_) => null);
};
};
return controller.stream;
Expand Down
6 changes: 1 addition & 5 deletions lib/src/merge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,7 @@ extension Merge<T> on Stream<T> {
}
controller.onCancel = () {
if (subscriptions.isEmpty) return null;
var cancels = [for (var s in subscriptions) s.cancel()]
// Handle opt-out nulls
..removeWhere((Object? f) => f == null);
if (cancels.isEmpty) return null;
return cancels.wait.then((_) => null);
return [for (var s in subscriptions) s.cancel()].wait.then((_) => null);
};
};
return controller.stream;
Expand Down
4 changes: 1 addition & 3 deletions lib/src/switch.dart
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,7 @@ extension SwitchLatest<T> on Stream<Stream<T>> {
var cancels = [
if (!outerStreamDone) outerSubscription.cancel(),
if (sub != null) sub.cancel(),
]
// Handle opt-out nulls
..removeWhere((Object? f) => f == null);
];
if (cancels.isEmpty) return null;
return cancels.wait.then(_ignore);
};
Expand Down