-
Notifications
You must be signed in to change notification settings - Fork 455
Concurrent #1132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrent #1132
Conversation
Looks like Concurrent depends on Semaphore and Promise, and Semaphore and Promise depend on Concurrent at the same time. Sounds good to me to integrate Semaphore and Promise with uncancellable support using Async, then iterate those to use Concurrent (cancellable) in Concurrent PR. |
modules/effects/arrow-effects-instances/src/main/kotlin/arrow/effects/IOStart.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects-kotlinx-coroutines/src/main/kotlin/arrow/effects/DeferredK.kt
Outdated
Show resolved
Hide resolved
EDIT: Fixed, was cancellation issue which caused a leak. @pakoito @JorgeCastilloPrz I am seeing something really weird and I am unsure what the issue is. I was investigating why the race laws and when using package problem
import arrow.effects.IO
import arrow.effects.instances.io.concurrent.race
import kotlinx.coroutines.Dispatchers
fun bug(fb: IO<Int>): IO<Unit> = IO.defer {
val end = 1000
fun IO<Unit>.loop(i: Int): IO<Unit> =
if (i == end) this
else flatMap { _ ->
race(Dispatchers.IO, IO.just(i), fb)
.flatMap { either ->
either.fold({ IO.just(it) }, { IO.raiseError(IllegalStateException("never() finished race")) })
}.map(::println)
.loop(i + 1)
}
IO.unit.loop(0)
}
fun main(args: Array<String>) =
// bug(IO.sleep(100).map { -1 }).unsafeRunSync() //runs fine
bug(IO.never).unsafeRunSync() //hangs around 62 iterations. |
modules/effects/arrow-effects-instances/src/main/kotlin/arrow/effects/instances/io.kt
Outdated
Show resolved
Hide resolved
...row-effects-kotlinx-coroutines-instances/src/main/kotlin/arrow/effects/DeferredKInstances.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/test/kotlin/arrow/effects/RefTest.kt
Outdated
Show resolved
Hide resolved
modules/streams/arrow-streams/src/main/kotlin/arrow/streams/internal/FreeC.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/typeclasses/Fiber.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/typeclasses/Fiber.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/Semaphore.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/IOBracket.kt
Outdated
Show resolved
Hide resolved
}) | ||
} | ||
|
||
fun <A, B, C> raceN(ctx: CoroutineContext, fa: Kind<F, A>, fb: Kind<F, B>, fc: Kind<F, C>): Kind<F, Either<A, Either<B, C>>> = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eye twitch is this nested Either how it's presented in Scala?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not. They have proper Product
s sigh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pakoito this is how it's currently presented on IO.Companion and is why I choose it. I am not a huge fan of this tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pakoito I made an attempt to make this nicer. Check the aliases at the bottom of the file. Thoughts?
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/typeclasses/Concurrent.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/IOStart.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/IOStart.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/IOStart.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects-kotlinx-coroutines/src/main/kotlin/arrow/effects/DeferredK.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects-kotlinx-coroutines/src/main/kotlin/arrow/effects/DeferredK.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/Promise.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/Promise.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/Promise.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/Promise.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/KindConnection.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/KindConnection.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/KindConnection.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/KindConnection.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/KindConnection.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/KindConnection.kt
Outdated
Show resolved
Hide resolved
Move CancelablePromise & UncancelablePromise to seperate files Refactor UncancelablePromise to be semantic blocking instead of blocking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very good overall, great job. Thanks for using the chance for adding all those docs and tests that we were still missing. I've added some optional suggestions. Don't want to block it since I'm not 100% sure of those.
modules/core/arrow-core-data/src/main/kotlin/arrow/core/Continuation.kt
Outdated
Show resolved
Hide resolved
modules/core/arrow-extras-extensions/src/main/kotlin/arrow/data/extensions/eithert.kt
Show resolved
Hide resolved
modules/core/arrow-extras-extensions/src/main/kotlin/arrow/data/extensions/eithert.kt
Show resolved
Hide resolved
modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/IORacePair.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/internal/CancelablePromise.kt
Outdated
Show resolved
Hide resolved
modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/internal/CancelablePromise.kt
Show resolved
Hide resolved
modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/internal/CancelablePromise.kt
Show resolved
Hide resolved
modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/internal/ImmediateContext.kt
Outdated
Show resolved
Hide resolved
...les/effects/arrow-effects-data/src/main/kotlin/arrow/effects/internal/UncancelablePromise.kt
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏 closer to streams 🎉
Amazing. Do I need to approve this once more? 😝. Can we think about merging? |
private fun IOEitherEQ(): Eq<Kind<EitherTPartialOf<ForIO, Throwable>, Either<Throwable, Int>>> = Eq { a, b -> | ||
a.value().attempt().unsafeRunSync() == b.value().attempt().unsafeRunSync() | ||
fun <A> EQ(): Eq<Kind<EitherTPartialOf<ForIO, Throwable>, A>> = Eq { a, b -> | ||
a.value().attempt().unsafeRunTimed(60.seconds) == b.value().attempt().unsafeRunTimed(60.seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* } | ||
*/ | ||
operator fun <F> invoke(n: Long, CF: Concurrent<F>): Kind<F, Semaphore<F>> = CF.run { | ||
assertNonNegative(n).flatMap { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use ULong here? Is that a thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems to be a thing although I have not really looked into this yet. Shall I create a ticket for this?
public inline class ULong @PublishedApi internal constructor(@PublishedApi internal val data: Long) : Comparable<ULong>
import arrow.effects.typeclasses.seconds | ||
import arrow.typeclasses.Eq | ||
|
||
fun <A> EQ(EQA: Eq<A> = Eq.any(), timeout: Duration = 60.seconds): Eq<Kind<ForIO, A>> = Eq { a, b -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have at least 2 uses of this that I've seen during the review. We should replace those too in another PR.
@@ -10,6 +10,14 @@ import arrow.effects.rx2.extensions.observablek.monad.monad | |||
import arrow.effects.rx2.extensions.observablek.monadError.monadError | |||
import arrow.effects.rx2.fix | |||
import arrow.effects.typeclasses.* | |||
import arrow.effects.typeclasses.Async |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these necessary when we already have import arrow.effects.typeclasses.*
in line 12?
import java.util.concurrent.atomic.AtomicReference | ||
import kotlin.coroutines.CoroutineContext | ||
|
||
@Suppress("LargeClass") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol
@@ -56,7 +55,13 @@ sealed class IO<out A> : IOOf<A> { | |||
IO { callback(Left(t)) } | |||
} | |||
|
|||
IORunLoop.startCancelable(fa, conn2, mapUnit) | |||
//DEV: If fa cancels conn2 like so `conn.cancel().map { cb(Right(Unit)) }` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after //
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sort of things should be reported by detekt / ktlint if we care enough
//It doesn't run the stack of conn2, instead the result is seen it cb of startCancelable. | ||
IORunLoop.startCancelable(fa, conn2) { | ||
|
||
if (it.fold({ e -> e == OnCancel.CancellationException }, { false }) && conn.isNotCanceled()) IORunLoop.start(conn.cancel(), mapUnit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: take the fold into boolean in a new variable. Put the comment on that variable instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't review all of it in one go, too many files. I've sent some comments, and I haven't checked the new, longer files such as CancellablePromise
cancelable
method.Blocked by Async.asyncF #1124Blocked by MVar #1171 & Async.asyncF #1124Blocked by Async.asyncF #1124Blocked by Async.asyncF #1124Blocked by Async.asyncF #1124Excluded from scope of this PR.
Nice to have[ ] Write RxJava instance[ ] Write Reactor instance[ ] Write Deferred instance