This repository was archived by the owner on Feb 10, 2025. It is now read-only.
File tree Expand file tree Collapse file tree 8 files changed +29
-13
lines changed Expand file tree Collapse file tree 8 files changed +29
-13
lines changed Original file line number Diff line number Diff line change 4
4
5
5
import 'dart:async' ;
6
6
7
+ import 'common_callbacks.dart' ;
8
+
7
9
extension AggregateSample <T > on Stream <T > {
8
10
/// Computes a value based on sequences of events, then emits that value when
9
11
/// [trigger] emits an event.
@@ -136,7 +138,7 @@ extension AggregateSample<T> on Stream<T> {
136
138
triggerSub! .pause ();
137
139
}
138
140
if (cancels.isEmpty) return null ;
139
- return cancels.wait.then ((_) => null );
141
+ return cancels.wait.then (ignoreArgument );
140
142
};
141
143
};
142
144
return controller.stream;
Original file line number Diff line number Diff line change 4
4
5
5
import 'dart:async' ;
6
6
7
+ import 'common_callbacks.dart' ;
7
8
import 'switch.dart' ;
8
9
9
10
/// Alternatives to [asyncExpand] .
@@ -78,7 +79,9 @@ extension AsyncExpand<T> on Stream<T> {
78
79
}
79
80
controller.onCancel = () {
80
81
if (subscriptions.isEmpty) return null ;
81
- return [for (var s in subscriptions) s.cancel ()].wait.then ((_) => null );
82
+ return [for (var s in subscriptions) s.cancel ()]
83
+ .wait
84
+ .then (ignoreArgument);
82
85
};
83
86
};
84
87
return controller.stream;
Original file line number Diff line number Diff line change 5
5
import 'dart:async' ;
6
6
7
7
import 'aggregate_sample.dart' ;
8
+ import 'common_callbacks.dart' ;
8
9
import 'from_handlers.dart' ;
9
10
import 'rate_limit.dart' ;
10
11
@@ -72,7 +73,7 @@ extension AsyncMap<T> on Stream<T> {
72
73
trigger: workFinished.stream,
73
74
aggregate: _dropPrevious,
74
75
longPoll: true ,
75
- onEmpty: _ignore < T > )
76
+ onEmpty: ignoreArgument )
76
77
._asyncMapThen (convert, workFinished.add);
77
78
}
78
79
@@ -133,4 +134,3 @@ extension AsyncMap<T> on Stream<T> {
133
134
}
134
135
135
136
T _dropPrevious <T >(T event, _) => event;
136
- void _ignore <T >(Sink <T > sink) {}
Original file line number Diff line number Diff line change 4
4
5
5
import 'dart:async' ;
6
6
7
+ import 'common_callbacks.dart' ;
8
+
7
9
/// Utilities to combine events from multiple streams through a callback or into
8
10
/// a list.
9
11
extension CombineLatest <T > on Stream <T > {
@@ -131,7 +133,7 @@ extension CombineLatest<T> on Stream<T> {
131
133
];
132
134
sourceSubscription = null ;
133
135
otherSubscription = null ;
134
- return cancels.wait.then ((_) => null );
136
+ return cancels.wait.then (ignoreArgument );
135
137
};
136
138
};
137
139
return controller.stream;
@@ -228,7 +230,9 @@ extension CombineLatest<T> on Stream<T> {
228
230
}
229
231
controller.onCancel = () {
230
232
if (subscriptions.isEmpty) return null ;
231
- return [for (var s in subscriptions) s.cancel ()].wait.then ((_) => null );
233
+ return [for (var s in subscriptions) s.cancel ()]
234
+ .wait
235
+ .then (ignoreArgument);
232
236
};
233
237
};
234
238
return controller.stream;
Original file line number Diff line number Diff line change
1
+ // Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
2
+ // for details. All rights reserved. Use of this source code is governed by a
3
+ // BSD-style license that can be found in the LICENSE file.
4
+
5
+ void ignoreArgument (_) {}
Original file line number Diff line number Diff line change 4
4
5
5
import 'dart:async' ;
6
6
7
+ import 'common_callbacks.dart' ;
8
+
7
9
/// Utilities to interleave events from multiple streams.
8
10
extension Merge <T > on Stream <T > {
9
11
/// Merges values and errors from this stream and [other] in any order as they
@@ -90,7 +92,9 @@ extension Merge<T> on Stream<T> {
90
92
}
91
93
controller.onCancel = () {
92
94
if (subscriptions.isEmpty) return null ;
93
- return [for (var s in subscriptions) s.cancel ()].wait.then ((_) => null );
95
+ return [for (var s in subscriptions) s.cancel ()]
96
+ .wait
97
+ .then (ignoreArgument);
94
98
};
95
99
};
96
100
return controller.stream;
Original file line number Diff line number Diff line change 5
5
import 'dart:async' ;
6
6
7
7
import 'aggregate_sample.dart' ;
8
+ import 'common_callbacks.dart' ;
8
9
import 'from_handlers.dart' ;
9
10
10
11
/// Utilities to rate limit events.
@@ -305,7 +306,7 @@ extension RateLimit<T> on Stream<T> {
305
306
trigger: trigger,
306
307
aggregate: _dropPrevious,
307
308
longPoll: longPoll,
308
- onEmpty: _ignore );
309
+ onEmpty: ignoreArgument );
309
310
310
311
/// Aggregates values until this source stream does not emit for [duration] ,
311
312
/// then emits the aggregated values.
@@ -353,4 +354,3 @@ extension RateLimit<T> on Stream<T> {
353
354
T _dropPrevious <T >(T element, _) => element;
354
355
List <T > _collect <T >(T event, List <T >? soFar) => (soFar ?? < T > [])..add (event);
355
356
void _empty <T >(Sink <List <T >> sink) => sink.add ([]);
356
- void _ignore <T >(Sink <T > sink) {}
Original file line number Diff line number Diff line change 5
5
import 'dart:async' ;
6
6
7
7
import 'async_expand.dart' ;
8
+ import 'common_callbacks.dart' ;
8
9
9
10
/// A utility to take events from the most recent sub stream returned by a
10
11
/// callback.
@@ -126,12 +127,9 @@ extension SwitchLatest<T> on Stream<Stream<T>> {
126
127
if (sub != null ) sub.cancel (),
127
128
];
128
129
if (cancels.isEmpty) return null ;
129
- return cancels.wait.then (_ignore );
130
+ return cancels.wait.then (ignoreArgument );
130
131
};
131
132
};
132
133
return controller.stream;
133
134
}
134
135
}
135
-
136
- /// Helper function to ignore future callback
137
- void _ignore (_, [__]) {}
You can’t perform that action at this time.
0 commit comments