diff --git a/build.sbt b/build.sbt index 9ff11ed0..29832e4c 100644 --- a/build.sbt +++ b/build.sbt @@ -1,9 +1,8 @@ // Our Scala versions. -lazy val `scala-2.12` = "2.12.13" lazy val `scala-2.13` = "2.13.6" -lazy val `scala-3.0` = "3.0.2" +lazy val `scala-3` = "3.0.2" // This is used in a couple places lazy val fs2Version = "3.1.1" @@ -46,7 +45,7 @@ lazy val commonSettings = Seq( // Compilation scalaVersion := `scala-2.13`, - crossScalaVersions := Seq(`scala-2.12`, `scala-2.13`, `scala-3.0`), + crossScalaVersions := Seq(`scala-2.13`, `scala-3`), scalacOptions -= "-language:experimental.macros", // doesn't work cross-version Compile / doc / scalacOptions --= Seq("-Xfatal-warnings"), Compile / doc / scalacOptions ++= Seq( @@ -115,6 +114,7 @@ lazy val core = crossProject(JVMPlatform, JSPlatform) "org.scodec" %%% "scodec-cats" % "1.1.0", "org.tpolecat" %%% "natchez-core" % natchezVersion, "org.tpolecat" %%% "sourcepos" % "1.0.1", + "org.tpolecat" %%% "pool-party" % "0.0.4", "org.scala-lang.modules" %%% "scala-collection-compat" % "2.4.4", ) ++ Seq( "com.beachape" %%% "enumeratum" % "1.6.1", diff --git a/modules/core/shared/src/main/scala/Session.scala b/modules/core/shared/src/main/scala/Session.scala index 873c63f0..5df8ecd6 100644 --- a/modules/core/shared/src/main/scala/Session.scala +++ b/modules/core/shared/src/main/scala/Session.scala @@ -13,6 +13,7 @@ import fs2.io.net.{ Network, SocketGroup } import fs2.Pipe import fs2.Stream import natchez.Trace +import org.tpolecat.poolparty.PooledResourceBuilder import skunk.codec.all.bool import skunk.data._ import skunk.net.Protocol @@ -22,6 +23,9 @@ import skunk.net.SSLNegotiation import skunk.data.TransactionIsolationLevel import skunk.data.TransactionAccessMode import skunk.net.protocol.Describe +import org.tpolecat.poolparty.PoolEvent +import org.tpolecat.poolparty.PoolEvent.FinalizerFailure +import org.tpolecat.poolparty.PoolEvent.HealthCheckFailure /** * Represents a live connection to a Postgres database. Operations provided here are safe to use @@ -235,6 +239,26 @@ object Session { } + object PoolReporters { + + /** + * A very minimal pool event reporter, which dumps all pool events when `debug` is true + * and dumps stack traces if a health check or finalizer fails with an exception (otherwise + * these exceptions are lost since they're serviced by a worker thread). We should make this + * more configurable. + * @param debug if true, log all events + */ + def default[F[_]: Console: Applicative](debug: Boolean): PoolEvent[Session[F]] => F[Unit] = e => + Console[F].println(s"Pool: $e").whenA(debug) *> { + e match { + case FinalizerFailure(_, _, _, ex) => Console[F].printStackTrace(ex) + case HealthCheckFailure(_, _, _, ex) => Console[F].printStackTrace(ex) + case _ => Applicative[F].unit + } + } + + } + /** * Resource yielding a `SessionPool` managing up to `max` concurrent `Session`s. Typically you * will `use` this resource once on application startup and pass the resulting @@ -259,7 +283,7 @@ object Session { * @param queryCache Size of the cache for query checking * @group Constructors */ - def pooled[F[_]: Concurrent: Trace: Network: Console]( + def pooled[F[_]: Temporal: Trace: Network: Console]( host: String, port: Int = 5432, user: String, @@ -282,7 +306,10 @@ object Session { for { dc <- Resource.eval(Describe.Cache.empty[F](commandCache, queryCache)) sslOp <- Resource.eval(ssl.toSSLNegotiationOptions(if (debug) logger.some else none)) - pool <- Pool.of(session(Network[F], sslOp, dc), max)(Recyclers.full) + pool <- PooledResourceBuilder.of(session(Network[F], sslOp, dc), max) + .withHealthCheck(Recyclers.full[F].run) + .withReporter(PoolReporters.default[F](debug)) + .build } yield pool } @@ -293,7 +320,7 @@ object Session { * single-session pool. This method is shorthand for `Session.pooled(..., max = 1, ...).flatten`. * @see pooled */ - def single[F[_]: Concurrent: Trace: Network: Console]( + def single[F[_]: Temporal: Trace: Network: Console]( host: String, port: Int = 5432, user: String, diff --git a/modules/core/shared/src/main/scala/util/Pool.scala b/modules/core/shared/src/main/scala/util/Pool.scala deleted file mode 100644 index fddeb333..00000000 --- a/modules/core/shared/src/main/scala/util/Pool.scala +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright (c) 2018-2021 by Rob Norris -// This software is licensed under the MIT License (MIT). -// For more information see LICENSE or https://opensource.org/licenses/MIT - -package skunk.util - -import cats.effect.Concurrent -import cats.effect.Deferred -import cats.effect.Ref -import cats.effect.implicits._ -import cats.effect.Resource -import cats.syntax.all._ -import skunk.exception.SkunkException -import natchez.Trace - -object Pool { - - /** Class of exceptions raised when a resource leak is detected on pool finalization. */ - final case class ResourceLeak(expected: Int, actual: Int, deferrals: Int) - extends SkunkException( - sql = None, - message = s"A resource leak was detected during pool finalization.", - detail = Some(s"Expected $expected active slot(s) and no deferrals, found $actual slots and $deferrals deferral(s)."), - hint = Some(""" - |The most common causes of resource leaks are (a) using a pool on a fiber that was neither - |joined or canceled prior to pool finalization, and (b) using `Resource.allocated` and - |failing to finalize allocated resources prior to pool finalization. - """.stripMargin.trim.linesIterator.mkString(" ")) - ) - - /** - * Exception raised to deferrals that remain during pool finalization. This indicates a - * programming error, typically misuse of fibers. - */ - object ShutdownException - extends SkunkException( - sql = None, - message = "The pool is being finalized and no more resources are available.", - hint = Some(""" - |The most common cause of this exception is using a pool on a fiber that was neither - |joined or canceled prior to pool finalization. - """.stripMargin.trim.linesIterator.mkString(" ")) - ) - - /** - * A pooled resource (which is itself a managed resource). - * @param rsrc the underlying resource to be pooled - * @param size maximum size of the pool (must be positive) - * @param recycler a cleanup/health-check to be done before elements are returned to the pool; - * yielding false here means the element should be freed and removed from the pool. - */ - def of[F[_]: Concurrent: Trace, A]( - rsrc: Resource[F, A], - size: Int)( - recycler: Recycler[F, A] - ): Resource[F, Resource[F, A]] = { - - // Just in case. - assert(size > 0, s"Pool size must be positive (you passed $size).") - - // The type of thing allocated by rsrc. - type Alloc = (A, F[Unit]) - - // Our pool state is a pair of queues, implemented as lists because I am lazy and it's not - // going to matter. - type State = ( - List[Option[Alloc]], // deque of alloc slots (filled on the left, empty on the right) - List[Deferred[F, Either[Throwable, Alloc]]] // queue of deferrals awaiting allocs - ) - - // We can construct a pool given a Ref containing our initial state. - def poolImpl(ref: Ref[F, State]): Resource[F, A] = { - - // To give out an alloc we create a deferral first, which we will need if there are no slots - // available. If there is a filled slot, remove it and yield its alloc. If there is an empty - // slot, remove it and allocate. If there are no slots, enqueue the deferral and wait on it, - // which will [semantically] block the caller until an alloc is returned to the pool. - val give: F[Alloc] = - Trace[F].span("pool.allocate") { - Deferred[F, Either[Throwable, Alloc]].flatMap { d => - - // If allocation fails for any reason then there's no resource to return to the pool - // later, so in this case we have to append a new empty slot to the queue. We do this in - // a couple places here so we factored it out. - val restore: PartialFunction[Throwable, F[Unit]] = { - case _ => ref.update { case (os, ds) => (os :+ None, ds) } - } - - // Here we go. The cases are a full slot (done), an empty slot (alloc), and no slots at - // all (defer and wait). - ref.modify { - case (Some(a) :: os, ds) => ((os, ds), a.pure[F]) - case (None :: os, ds) => ((os, ds), Concurrent[F].onError(rsrc.allocated)(restore)) - case (Nil, ds) => ((Nil, ds :+ d), d.get.flatMap(_.liftTo[F].onError(restore))) - } .flatten - - } - } - - // To take back an alloc we nominally just hand it out or push it back onto the queue, but - // there are a bunch of error conditions to consider. This operation is a finalizer and - // cannot be canceled, so we don't need to worry about that case here. - def take(a: Alloc): F[Unit] = - Trace[F].span("pool.free") { - recycler(a._1).onError { - case t => dispose(a) *> t.raiseError[F, Unit] - } flatMap { - case true => recycle(a) - case false => dispose(a) - } - } - - // Return `a` to the pool. If there are awaiting deferrals, complete the next one. Otherwise - // push a filled slot into the queue. - def recycle(a: Alloc): F[Unit] = - Trace[F].span("recycle") { - ref.modify { - case (os, d :: ds) => ((os, ds), d.complete(a.asRight).void) // hand it back out - case (os, Nil) => ((Some(a) :: os, Nil), ().pure[F]) // return to pool - } .flatten - } - - // Something went wrong when returning `a` to the pool so let's dispose of it and figure out - // how to clean things up. If there are no deferrals, append an empty slot to take the place - // of `a`. If there are deferrals, remove the next one and complete it (failures in allocation - // are handled by the awaiting deferral in `give` above). Always finalize `a` - def dispose(a: Alloc): F[Unit] = - Trace[F].span("dispose") { - ref.modify { - case (os, Nil) => ((os :+ None, Nil), ().pure[F]) // new empty slot - case (os, d :: ds) => ((os, ds), Concurrent[F].attempt(rsrc.allocated).flatMap(d.complete).void) // alloc now! - } .guarantee(a._2).flatten - } - - // Hey, that's all we need to create our resource! - Resource.make(give)(take).map(_._1) - - } - - // The pool itself is really just a wrapper for its state ref. - def alloc: F[Ref[F, State]] = - Ref[F].of((List.fill(size)(None), Nil)) - - // When the pool shuts down we finalize all the allocs, which should have been returned by now. - // Any remaining deferrals (there should be none, but can be due to poor fiber hygeine) are - // completed with `ShutdownException`. - def free(ref: Ref[F, State]): F[Unit] = - ref.get.flatMap { - - // Complete all awaiting deferrals with a `ShutdownException`, then raise an error if there - // are fewer slots than the pool size. Both conditions can be provoked by poor resource - // hygiene (via fibers typically). Then finalize any remaining pooled elements. Failure of - // pool finalization may result in unfinalized resources. To be improved. - case (os, ds) => - ds.traverse(_.complete(Left(ShutdownException))) *> - ResourceLeak(size, os.length, ds.length).raiseError[F, Unit].whenA(os.length != size) *> - os.traverse_ { - case Some((_, free)) => free - case None => ().pure[F] - } - - } - - Resource.make(alloc)(free).map(poolImpl) - - } - -} \ No newline at end of file diff --git a/modules/tests/shared/src/test/scala/PoolTest.scala b/modules/tests/shared/src/test/scala/PoolTest.scala deleted file mode 100644 index 5fa5dcc8..00000000 --- a/modules/tests/shared/src/test/scala/PoolTest.scala +++ /dev/null @@ -1,269 +0,0 @@ -// Copyright (c) 2018-2021 by Rob Norris -// This software is licensed under the MIT License (MIT). -// For more information see LICENSE or https://opensource.org/licenses/MIT - -package tests - -import ffstest.FTest -import cats.effect.IO -import cats.effect.Resource -import cats.syntax.all._ -import scala.concurrent.duration._ -import skunk.util.Pool -import cats.effect.Ref -import skunk.util.Pool.ResourceLeak -import cats.effect.Deferred -import scala.util.Random -import skunk.util.Pool.ShutdownException -import natchez.Trace.Implicits.noop -import skunk.util.Recycler - -class PoolTest extends FTest { - - case class UserFailure() extends Exception("user failure") - case class AllocFailure() extends Exception("allocation failure") - case class FreeFailure() extends Exception("free failure") - case class ResetFailure() extends Exception("reset failure") - - val ints: IO[Resource[IO, Int]] = - Ref[IO].of(1).map { ref => - val next = ref.modify(n => (n + 1, n.pure[IO])).flatten - Resource.make(next)(_ => IO.unit) - } - - // list of computations into computation that yields results one by one - def yielding[A](fas: IO[A]*): IO[IO[A]] = - Ref[IO].of(fas.toList).map { ref => - ref.modify { - case Nil => (Nil, IO.raiseError(new Exception("No more values!"))) - case fa :: fas => (fas, fa) - } .flatten - } - - def resourceYielding[A](fas: IO[A]*): IO[Resource[IO, A]] = - yielding(fas: _*).map(Resource.make(_)(_ => IO.unit)) - - // This test leaks - test("error in alloc is rethrown to caller (immediate)") { - val rsrc = Resource.make(IO.raiseError[String](AllocFailure()))(_ => IO.unit) - val pool = Pool.of(rsrc, 42)(Recycler.success) - pool.use(_.use(_ => IO.unit)).assertFailsWith[AllocFailure] - } - - test("error in alloc is rethrown to caller (deferral completion following errored cleanup)") { - resourceYielding(IO(1), IO.raiseError(AllocFailure())).flatMap { r => - val p = Pool.of(r, 1)(Recycler[IO, Int](_ => IO.raiseError(ResetFailure()))) - p.use { r => - for { - d <- Deferred[IO, Unit] - f1 <- r.use(n => assertEqual("n should be 1", n, 1) *> d.get).assertFailsWith[ResetFailure].start - f2 <- r.use(_ => fail[Int]("should never get here")).assertFailsWith[AllocFailure].start - _ <- d.complete(()) - _ <- f1.join - _ <- f2.join - } yield () - } - } - } - - test("error in alloc is rethrown to caller (deferral completion following failed cleanup)") { - resourceYielding(IO(1), IO.raiseError(AllocFailure())).flatMap { r => - val p = Pool.of(r, 1)(Recycler.failure) - p.use { r => - for { - d <- Deferred[IO, Unit] - f1 <- r.use(n => assertEqual("n should be 1", n, 1) *> d.get).start - f2 <- r.use(_ => fail[Int]("should never get here")).assertFailsWith[AllocFailure].start - _ <- d.complete(()) - _ <- f1.join - _ <- f2.join - } yield () - } - } - } - - test("provoke dangling deferral cancellation") { - ints.flatMap { r => - val p = Pool.of(r, 1)(Recycler.failure) - Deferred[IO, Either[Throwable, Int]].flatMap { d1 => - p.use { r => - for { - d <- Deferred[IO, Unit] - _ <- r.use(_ => d.complete(()) *> IO.never).start // leaked forever - _ <- d.get // make sure the resource has been allocated - f <- r.use(_ => fail[Int]("should never get here")).attempt.flatMap(d1.complete).start // defer - _ <- IO.sleep(100.milli) // ensure that the fiber has a chance to run - } yield f - } .assertFailsWith[ResourceLeak].flatMap { - case ResourceLeak(1, 0, 1) => d1.get.flatMap(_.liftTo[IO]) - case e => e.raiseError[IO, Unit] - } .assertFailsWith[ShutdownException.type].void - } - }} - - test("error in free is rethrown to caller") { - val rsrc = Resource.make("foo".pure[IO])(_ => IO.raiseError(FreeFailure())) - val pool = Pool.of(rsrc, 42)(Recycler.success) - pool.use(_.use(_ => IO.unit)).assertFailsWith[FreeFailure] - } - - test("error in reset is rethrown to caller") { - val rsrc = Resource.make("foo".pure[IO])(_ => IO.unit) - val pool = Pool.of(rsrc, 42)(Recycler[IO, String](_ => IO.raiseError(ResetFailure()))) - pool.use(_.use(_ => IO.unit)).assertFailsWith[ResetFailure] - } - - test("reuse on serial access") { - ints.map(Pool.of(_, 3)(Recycler.success)).flatMap { factory => - factory.use { pool => - pool.use { n => - assertEqual("first num should be 1", n, 1) - } *> - pool.use { n => - assertEqual("we should get it again", n, 1) - } - } - } - } - - test("allocation on nested access") { - ints.map(Pool.of(_, 3)(Recycler.success)).flatMap { factory => - factory.use { pool => - pool.use { n => - assertEqual("first num should be 1", n, 1) *> - pool.use { n => - assertEqual("but this one should be 2", n, 2) - } *> - pool.use { n => - assertEqual("and again", n, 2) - } - } - } - } - } - - test("allocated resource can cause a leak, which will be detected on finalization") { - ints.map(Pool.of(_, 3)(Recycler.success)).flatMap { factory => - factory.use { pool => - pool.allocated - } .assertFailsWith[ResourceLeak].flatMap { - case ResourceLeak(expected, actual, _) => - assert("expected 1 leakage", expected - actual == 1) - } - } - } - - test("unmoored fiber can cause a leak, which will be detected on finalization") { - ints.map(Pool.of(_, 3)(Recycler.success)).flatMap { factory => - factory.use { pool => - pool.use(_ => IO.never).start *> - IO.sleep(100.milli) // ensure that the fiber has a chance to run - } .assertFailsWith[ResourceLeak].flatMap { - case ResourceLeak(expected, actual, _) => - assert("expected 1 leakage", expected - actual == 1) - } - } - } - - // Concurrency tests below. These are nondeterministic and need a lot of exercise. - - val PoolSize = 10 - val ConcurrentTasks = 500 - - test("progress and safety with many fibers") { - ints.map(Pool.of(_, PoolSize)(Recycler.success)).flatMap { factory => - (1 to ConcurrentTasks).toList.parTraverse_{ _ => - factory.use { p => - p.use { _ => - for { - t <- IO(Random.nextInt() % 100) - _ <- IO.sleep(t.milliseconds) - } yield () - } - } - } - } - } - - test("progress and safety with many fibers and cancellation") { - ints.map(Pool.of(_, PoolSize)(Recycler.success)).flatMap { factory => - factory.use { pool => - (1 to ConcurrentTasks).toList.parTraverse_{_ => - for { - t <- IO(Random.nextInt() % 100) - f <- pool.use(_ => IO.sleep(t.milliseconds)).start - _ <- if (t > 50) f.join else f.cancel - } yield () - } - } - } - } - - test("progress and safety with many fibers and user failures") { - ints.map(Pool.of(_, PoolSize)(Recycler.success)).flatMap { factory => - factory.use { pool => - (1 to ConcurrentTasks).toList.parTraverse_{ _ => - pool.use { _ => - for { - t <- IO(Random.nextInt() % 100) - _ <- IO.sleep(t.milliseconds) - _ <- IO.raiseError(UserFailure()).whenA(t < 50) - } yield () - } .attempt // swallow errors so we don't fail fast - } - } - } - } - - test("progress and safety with many fibers and allocation failures") { - val alloc = IO(Random.nextBoolean()).flatMap { - case true => IO.unit - case false => IO.raiseError(AllocFailure()) - } - val rsrc = Resource.make(alloc)(_ => IO.unit) - Pool.of(rsrc, PoolSize)(Recycler.success).use { pool => - (1 to ConcurrentTasks).toList.parTraverse_{ _ => - pool.use { _ => - IO.unit - } .attempt - } - } - } - - test("progress and safety with many fibers and freeing failures") { - val free = IO(Random.nextBoolean()).flatMap { - case true => IO.unit - case false => IO.raiseError(FreeFailure()) - } - val rsrc = Resource.make(IO.unit)(_ => free) - Pool.of(rsrc, PoolSize)(Recycler.success).use { pool => - (1 to ConcurrentTasks).toList.parTraverse_{ _ => - pool.use { _ => - IO.unit - } .attempt - } - } .handleErrorWith { - // cleanup here may raise an exception, so we need to handle that - case FreeFailure() => IO.unit - } - } - - test("progress and safety with many fibers and reset failures") { - val recycle = IO(Random.nextInt(3)).flatMap { - case 0 => true.pure[IO] - case 1 => false.pure[IO] - case 2 => IO.raiseError(ResetFailure()) - } - val rsrc = Resource.make(IO.unit)(_ => IO.unit) - Pool.of(rsrc, PoolSize)(Recycler(_ => recycle)).use { pool => - (1 to ConcurrentTasks).toList.parTraverse_{ _ => - pool.use { _ => - IO.unit - } handleErrorWith { - case ResetFailure() => IO.unit - } - } - } - } - -} \ No newline at end of file