Skip to content

It's impossible to know if an element was delivered in channelFlow #4414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
LouisCAD opened this issue Apr 15, 2025 · 10 comments
Open

It's impossible to know if an element was delivered in channelFlow #4414

LouisCAD opened this issue Apr 15, 2025 · 10 comments
Labels

Comments

@LouisCAD
Copy link
Contributor

Hello,

I'm trying to solve a problem perfectly with the help of channelFlow { … }, but send(…)'s unavoidable prompt-cancellation guarantee is getting in the way.

Use case:

In the snippet below, I want to be able to know if newValue was successfully delivered, and if so, I want to ensure lastEmitElapsedNanos is set.

Unfortunately, because of Channel's send(…) prompt cancellation guarantee, it's impossible to know if cancellation happened before or after the channelFlow received the value and passed it downstream.

The onUndeliveredElement parameter of Channel can't be set from channelFlow, nor callbackFlow, and even if it was possible, it's be called in the wrong scope.

Here, I need a variant of send that checks cancellation only until the value is accepted: if cancellation happens as the value was just delivered, I want it to not throw CancellationException, and let me keep the fact that the value was passed downstream. A sort of "atomic" behavior on exit.

fun <T> Flow<T>.rateLimit(minInterval: Duration): Flow<T> = channelFlow {
    var lastEmitElapsedNanos = 0L
    collectLatest { newValue ->
        val nanosSinceLastEmit = SystemClock.elapsedRealtimeNanos() - lastEmitElapsedNanos
        val timeSinceLastEmit = nanosSinceLastEmit.nanoseconds
        val timeToWait = minInterval - timeSinceLastEmit.coerceAtMost(minInterval)
        delay(timeToWait)
        send(newValue) // We don't handle the case where cancellation happens after it's received.
        lastEmitElapsedNanos = SystemClock.elapsedRealtimeNanos()
    }
}.buffer(Channel.RENDEZVOUS)

What do we have now?

send with prompt cancellation guarantees, but with no way to know if the value was actually delivered when the coroutine calling send can be cancelled.

What should be instead?

send could have a sendAtomic variant, or something, that doesn't propagate cancellation if the value was just delivered.

Why?

I'm not aware of any workaround at the moment.

The code above would become instantly correct after using the new "atomic" version, instead of not updating lastEmitElapsedNanos when newValue was delivered while the receiver flow got a new value that made collectLatest cancel the sub-coroutine.

Why not?

Not a breaking change if added as an overload (e.g. sendAtomic(…) or send(…, atomic = true).

Potentially breaking behavior if send is changed instead.

@dkhalanskyjb
Copy link
Collaborator

This is a splendid brain-teaser! I think I got it—would this work?

/**
 * Runs [block] in a non-cancellable manner, providing it with a [Deferred] that gets completed whenever cancellation happens.
 */
suspend fun <T> withReifiedCancellation(block: suspend CoroutineScope.(cancelled: Deferred<Unit>) -> T): T {
    val isCancelled = CompletableDeferred<Unit>(parent = currentCoroutineContext().job)
    return try {
        withContext(NonCancellable) {
            block(isCancelled)
        }
    } finally {
        isCancelled.cancel()
    }
}

fun <T> Flow<T>.rateLimit(minInterval: Duration): Flow<T> = channelFlow<T> {
    var lastEmitElapsedNanos = 0L
    collectLatest { newValue ->
        withReifiedCancellation { cancelled ->
            val nanosSinceLastEmit = System.nanoTime() - lastEmitElapsedNanos
            val timeSinceLastEmit = nanosSinceLastEmit.nanoseconds
            val timeToWait = minInterval - timeSinceLastEmit.coerceAtMost(minInterval)
            select {
                onTimeout(timeToWait) {
                    select {
                        onSend(newValue) {
                            lastEmitElapsedNanos = System.nanoTime()
                        }
                        cancelled.onJoin {
                        }
                    }
                }
                cancelled.onJoin {
                }
            }
        }
    }
}.buffer(Channel.RENDEZVOUS)

The gist of the solution is that only the cancellation is magically trumping atomicity in kotlinx.coroutines, so if we transform cancellation into something different, we can win back atomicity.

@LouisCAD
Copy link
Contributor Author

LouisCAD commented Apr 17, 2025

Thanks for the prompt response!

This changes the code structure quite significantly, but it inspired me to try making the sendAtomic extension I'm looking for.
I think it'd be correct and return if sending happened, even though cancellation was also happening.

suspend fun <T> SendChannel<T>.sendAtomic(element: T) {
    val cancellationSignal = CompletableDeferred<Unit>(parent = currentCoroutineContext().job)
    try {
        val wasSent: Boolean = withContext(NonCancellable) {
            select {
                onSend(element) { true }
                cancellationSignal.onJoin { false }
            }
        }
        if (wasSent) return else currentCoroutineContext().ensureActive()
    } finally {
        cancellationSignal.cancel()
    }
}

@LouisCAD
Copy link
Contributor Author

Now, I'm wondering if using NonCancellable could cause coroutine garbage collection, since it's immutable, and is therefore not keeping a reference to children 🤔

@dkhalanskyjb
Copy link
Collaborator

At this point, this can be generalized into a nice combinator:

public suspend inline fun <R : Any> trySelectAtomically(crossinline builder: SelectBuilder<R>.() -> Unit): R? {
    val cancellationJob = Job(parent = currentCoroutineContext().job)
    return try {
        withContext(NonCancellable) {
            select {
                builder()
                cancellationJob.onJoin { null }
            }
        }
    } finally {
        cancellationJob.cancel()
    }
}

suspend fun <T> SendChannel<T>.sendAtomic(element: T) {
    trySelectAtomically {
        onSend(element) { }
    } ?: currentCoroutineContext().ensureActive()
}

Regarding garbage collection: currentCoroutineContext().job remembers cancellationJob (as its child), which in turn remembers the cancellationJob.onJoin callback (which it has to call when it gets cancelled).

@LouisCAD
Copy link
Contributor Author

Nice stuff, could serve some other use cases.

Regarding GC protection, isn't the Channel also keeping a strong reference to the continuation via onSend?

@dkhalanskyjb
Copy link
Collaborator

For sendAtomic, yes, but even trySendAtomically with an empty block is safe.

@LouisCAD
Copy link
Contributor Author

LouisCAD commented Apr 17, 2025

Made a version that accepts any custom value for the cancellation case, and ditches the unneeded try/finally block:

/**
 * [SendChannel.send] can throw a [CancellationException] after the value was sent,
 * which might not be desirable if we want to take factor-in whether the value was actually
 * sent.
 *
 * That's why this atomic version exists.
 * It supports cancellation, but if the value is sent, it will return instead, even if it's
 * happening concurrently to cancellation.
 *
 * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/4414) for more details.
 */
suspend fun <T> SendChannel<T>.sendAtomic(element: T) {
    trySelectAtomically(onCancellation = { null }) {
        onSend(element) {}
    } ?: currentCoroutineContext().ensureActive()
}

suspend inline fun <R> trySelectAtomically(
    crossinline onCancellation: suspend () -> R,
    crossinline builder: SelectBuilder<R>.() -> Unit
): R? {
    val cancellationSignal = Job(parent = currentCoroutineContext().job)
    try {
        return withContext(NonCancellable) {
            select {
                builder() // We need to be biased towards this clause, so it comes first.
                cancellationSignal.onJoin { onCancellation() }
            }
        }
    } finally {
        cancellationSignal.cancel() // The `builder()` clause could throw, so we need this in the finally block.
    }
}

@dkhalanskyjb
Copy link
Collaborator

try/finally is necessary in cases like this:

coroutineScope {
  try {
    trySelectAtomically {
      onTimeout(1.seconds) { throw IllegalStateException() }
    }
  } catch (e: IllegalStateException) {
    // do nothing
  }
} // hangs due to a leaking job

@LouisCAD
Copy link
Contributor Author

Nice catch, thank you! I updated my message to restore the try/finally block and not mislead anyone reading too fast.

@LouisCAD
Copy link
Contributor Author

LouisCAD commented Apr 22, 2025

With a colleague, I found a version of trySelectAtomically that doesn't need to touch the job parent manually, and doesn't need a try/finally block either. We use a local coroutineScope { … } instead, and launch { awaitCancellation() }.

I like that it doesn't touch a job created outside of the function directly, and is therefore less close to breaking structured concurrency.

/**
 * Always return if a select clause from [builder] was selected,
 * while allowing cancellation, if it happened strictly before one
 * of these clauses could be selected.
 */
private suspend inline fun <R> trySelectAtomically(
    crossinline onCancellation: suspend () -> R,
    crossinline builder: SelectBuilder<R>.() -> Unit
): R? = coroutineScope {
    // We connect `cancellationSignal` to the current job, so it can be used as
    // a secondary select clause below, even though cancellation is blocked
    // using `withContext(NonCancellable)`.
    // The atomic behavior of `select` allows us to get the desired behavior.
    val cancellationSignal = launch { awaitCancellation() }
    withContext(NonCancellable) {
        select {
            builder() // We need to be biased towards this/these clause(s), so it comes first.
            cancellationSignal.onJoin { onCancellation() }
        }.also {
            // If a builder clause was selected, stop this job to allow coroutineScope to complete.
            cancellationSignal.cancel()
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants