Skip to content

Concurrent #1132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 125 commits into from
Jan 13, 2019
Merged

Concurrent #1132

merged 125 commits into from
Jan 13, 2019

Conversation

nomisRev
Copy link
Member

@nomisRev nomisRev commented Nov 16, 2018

Excluded from scope of this PR.
Nice to have
[ ] Write RxJava instance
[ ] Write Reactor instance
[ ] Write Deferred instance

@JorgeCastilloPrz
Copy link
Member

Looks like Concurrent depends on Semaphore and Promise, and Semaphore and Promise depend on Concurrent at the same time.

Sounds good to me to integrate Semaphore and Promise with uncancellable support using Async, then iterate those to use Concurrent (cancellable) in Concurrent PR.

@nomisRev
Copy link
Member Author

nomisRev commented Nov 27, 2018

EDIT: Fixed, was cancellation issue which caused a leak.

@pakoito @JorgeCastilloPrz I am seeing something really weird and I am unsure what the issue is. I was investigating why the race laws and when using IO.never or IO.async().never() these law fail so I made this example but became none the wiser. Any thoughts or directions?

package problem

import arrow.effects.IO
import arrow.effects.instances.io.concurrent.race
import kotlinx.coroutines.Dispatchers

fun bug(fb: IO<Int>): IO<Unit> = IO.defer {
  val end = 1000

  fun IO<Unit>.loop(i: Int): IO<Unit> =
    if (i == end) this
    else flatMap { _ ->
      race(Dispatchers.IO, IO.just(i), fb)
        .flatMap { either ->
          either.fold({ IO.just(it) }, { IO.raiseError(IllegalStateException("never() finished race")) })
        }.map(::println)
        .loop(i + 1)
    }

  IO.unit.loop(0)
}

fun main(args: Array<String>) =
//  bug(IO.sleep(100).map { -1 }).unsafeRunSync() //runs fine
  bug(IO.never).unsafeRunSync() //hangs around 62 iterations.

})
}

fun <A, B, C> raceN(ctx: CoroutineContext, fa: Kind<F, A>, fb: Kind<F, B>, fc: Kind<F, C>): Kind<F, Either<A, Either<B, C>>> =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eye twitch is this nested Either how it's presented in Scala?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not. They have proper Products sigh

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pakoito this is how it's currently presented on IO.Companion and is why I choose it. I am not a huge fan of this tho.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pakoito I made an attempt to make this nicer. Check the aliases at the bottom of the file. Thoughts?

Copy link
Member

@JorgeCastilloPrz JorgeCastilloPrz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very good overall, great job. Thanks for using the chance for adding all those docs and tests that we were still missing. I've added some optional suggestions. Don't want to block it since I'm not 100% sure of those.

Copy link
Member

@JorgeCastilloPrz JorgeCastilloPrz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job

Copy link
Member

@raulraja raulraja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏 closer to streams 🎉

@JorgeCastilloPrz
Copy link
Member

Amazing. Do I need to approve this once more? 😝. Can we think about merging?

private fun IOEitherEQ(): Eq<Kind<EitherTPartialOf<ForIO, Throwable>, Either<Throwable, Int>>> = Eq { a, b ->
a.value().attempt().unsafeRunSync() == b.value().attempt().unsafeRunSync()
fun <A> EQ(): Eq<Kind<EitherTPartialOf<ForIO, Throwable>, A>> = Eq { a, b ->
a.value().attempt().unsafeRunTimed(60.seconds) == b.value().attempt().unsafeRunTimed(60.seconds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* }
*/
operator fun <F> invoke(n: Long, CF: Concurrent<F>): Kind<F, Semaphore<F>> = CF.run {
assertNonNegative(n).flatMap {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use ULong here? Is that a thing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems to be a thing although I have not really looked into this yet. Shall I create a ticket for this?

public inline class ULong @PublishedApi internal constructor(@PublishedApi internal val data: Long) : Comparable<ULong>

import arrow.effects.typeclasses.seconds
import arrow.typeclasses.Eq

fun <A> EQ(EQA: Eq<A> = Eq.any(), timeout: Duration = 60.seconds): Eq<Kind<ForIO, A>> = Eq { a, b ->
Copy link
Member

@pakoito pakoito Jan 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have at least 2 uses of this that I've seen during the review. We should replace those too in another PR.

@@ -10,6 +10,14 @@ import arrow.effects.rx2.extensions.observablek.monad.monad
import arrow.effects.rx2.extensions.observablek.monadError.monadError
import arrow.effects.rx2.fix
import arrow.effects.typeclasses.*
import arrow.effects.typeclasses.Async
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these necessary when we already have import arrow.effects.typeclasses.* in line 12?

import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext

@Suppress("LargeClass")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol

@@ -56,7 +55,13 @@ sealed class IO<out A> : IOOf<A> {
IO { callback(Left(t)) }
}

IORunLoop.startCancelable(fa, conn2, mapUnit)
//DEV: If fa cancels conn2 like so `conn.cancel().map { cb(Right(Unit)) }`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after //

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sort of things should be reported by detekt / ktlint if we care enough

//It doesn't run the stack of conn2, instead the result is seen it cb of startCancelable.
IORunLoop.startCancelable(fa, conn2) {

if (it.fold({ e -> e == OnCancel.CancellationException }, { false }) && conn.isNotCanceled()) IORunLoop.start(conn.cancel(), mapUnit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: take the fold into boolean in a new variable. Put the comment on that variable instead

Copy link
Member

@pakoito pakoito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't review all of it in one go, too many files. I've sent some comments, and I haven't checked the new, longer files such as CancellablePromise

@nomisRev nomisRev merged commit 72b4c13 into master Jan 13, 2019
@nomisRev nomisRev deleted the simon-concurrent branch January 13, 2019 17:50
@nomisRev nomisRev mentioned this pull request Jan 13, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants