-
Notifications
You must be signed in to change notification settings - Fork 1.9k
It's impossible to know if an element was delivered in channelFlow
#4414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
This is a splendid brain-teaser! I think I got it—would this work? /**
* Runs [block] in a non-cancellable manner, providing it with a [Deferred] that gets completed whenever cancellation happens.
*/
suspend fun <T> withReifiedCancellation(block: suspend CoroutineScope.(cancelled: Deferred<Unit>) -> T): T {
val isCancelled = CompletableDeferred<Unit>(parent = currentCoroutineContext().job)
return try {
withContext(NonCancellable) {
block(isCancelled)
}
} finally {
isCancelled.cancel()
}
}
fun <T> Flow<T>.rateLimit(minInterval: Duration): Flow<T> = channelFlow<T> {
var lastEmitElapsedNanos = 0L
collectLatest { newValue ->
withReifiedCancellation { cancelled ->
val nanosSinceLastEmit = System.nanoTime() - lastEmitElapsedNanos
val timeSinceLastEmit = nanosSinceLastEmit.nanoseconds
val timeToWait = minInterval - timeSinceLastEmit.coerceAtMost(minInterval)
select {
onTimeout(timeToWait) {
select {
onSend(newValue) {
lastEmitElapsedNanos = System.nanoTime()
}
cancelled.onJoin {
}
}
}
cancelled.onJoin {
}
}
}
}
}.buffer(Channel.RENDEZVOUS) The gist of the solution is that only the cancellation is magically trumping atomicity in |
Thanks for the prompt response! This changes the code structure quite significantly, but it inspired me to try making the suspend fun <T> SendChannel<T>.sendAtomic(element: T) {
val cancellationSignal = CompletableDeferred<Unit>(parent = currentCoroutineContext().job)
try {
val wasSent: Boolean = withContext(NonCancellable) {
select {
onSend(element) { true }
cancellationSignal.onJoin { false }
}
}
if (wasSent) return else currentCoroutineContext().ensureActive()
} finally {
cancellationSignal.cancel()
}
} |
Now, I'm wondering if using |
At this point, this can be generalized into a nice combinator: public suspend inline fun <R : Any> trySelectAtomically(crossinline builder: SelectBuilder<R>.() -> Unit): R? {
val cancellationJob = Job(parent = currentCoroutineContext().job)
return try {
withContext(NonCancellable) {
select {
builder()
cancellationJob.onJoin { null }
}
}
} finally {
cancellationJob.cancel()
}
}
suspend fun <T> SendChannel<T>.sendAtomic(element: T) {
trySelectAtomically {
onSend(element) { }
} ?: currentCoroutineContext().ensureActive()
} Regarding garbage collection: |
Nice stuff, could serve some other use cases. Regarding GC protection, isn't the |
For |
Made a version that accepts any custom value for the cancellation case, /**
* [SendChannel.send] can throw a [CancellationException] after the value was sent,
* which might not be desirable if we want to take factor-in whether the value was actually
* sent.
*
* That's why this atomic version exists.
* It supports cancellation, but if the value is sent, it will return instead, even if it's
* happening concurrently to cancellation.
*
* See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/4414) for more details.
*/
suspend fun <T> SendChannel<T>.sendAtomic(element: T) {
trySelectAtomically(onCancellation = { null }) {
onSend(element) {}
} ?: currentCoroutineContext().ensureActive()
}
suspend inline fun <R> trySelectAtomically(
crossinline onCancellation: suspend () -> R,
crossinline builder: SelectBuilder<R>.() -> Unit
): R? {
val cancellationSignal = Job(parent = currentCoroutineContext().job)
try {
return withContext(NonCancellable) {
select {
builder() // We need to be biased towards this clause, so it comes first.
cancellationSignal.onJoin { onCancellation() }
}
}
} finally {
cancellationSignal.cancel() // The `builder()` clause could throw, so we need this in the finally block.
}
} |
coroutineScope {
try {
trySelectAtomically {
onTimeout(1.seconds) { throw IllegalStateException() }
}
} catch (e: IllegalStateException) {
// do nothing
}
} // hangs due to a leaking job |
Nice catch, thank you! I updated my message to restore the try/finally block and not mislead anyone reading too fast. |
With a colleague, I found a version of I like that it doesn't touch a job created outside of the function directly, and is therefore less close to breaking structured concurrency. /**
* Always return if a select clause from [builder] was selected,
* while allowing cancellation, if it happened strictly before one
* of these clauses could be selected.
*/
private suspend inline fun <R> trySelectAtomically(
crossinline onCancellation: suspend () -> R,
crossinline builder: SelectBuilder<R>.() -> Unit
): R? = coroutineScope {
// We connect `cancellationSignal` to the current job, so it can be used as
// a secondary select clause below, even though cancellation is blocked
// using `withContext(NonCancellable)`.
// The atomic behavior of `select` allows us to get the desired behavior.
val cancellationSignal = launch { awaitCancellation() }
withContext(NonCancellable) {
select {
builder() // We need to be biased towards this/these clause(s), so it comes first.
cancellationSignal.onJoin { onCancellation() }
}.also {
// If a builder clause was selected, stop this job to allow coroutineScope to complete.
cancellationSignal.cancel()
}
}
} |
Hello,
I'm trying to solve a problem perfectly with the help of
channelFlow { … }
, butsend(…)
's unavoidable prompt-cancellation guarantee is getting in the way.Use case:
In the snippet below, I want to be able to know if
newValue
was successfully delivered, and if so, I want to ensurelastEmitElapsedNanos
is set.Unfortunately, because of
Channel
'ssend(…)
prompt cancellation guarantee, it's impossible to know if cancellation happened before or after thechannelFlow
received the value and passed it downstream.The
onUndeliveredElement
parameter ofChannel
can't be set fromchannelFlow
, norcallbackFlow
, and even if it was possible, it's be called in the wrong scope.Here, I need a variant of
send
that checks cancellation only until the value is accepted: if cancellation happens as the value was just delivered, I want it to not throwCancellationException
, and let me keep the fact that the value was passed downstream. A sort of "atomic" behavior on exit.What do we have now?
send
with prompt cancellation guarantees, but with no way to know if the value was actually delivered when the coroutine callingsend
can be cancelled.What should be instead?
send
could have asendAtomic
variant, or something, that doesn't propagate cancellation if the value was just delivered.Why?
I'm not aware of any workaround at the moment.
The code above would become instantly correct after using the new "atomic" version, instead of not updating
lastEmitElapsedNanos
whennewValue
was delivered while the receiver flow got a new value that madecollectLatest
cancel the sub-coroutine.Why not?
Not a breaking change if added as an overload (e.g.
sendAtomic(…)
orsend(…, atomic = true)
.Potentially breaking behavior if
send
is changed instead.The text was updated successfully, but these errors were encountered: