diff --git a/spec/std/fiber/execution_context/multi_threaded_spec.cr b/spec/std/fiber/execution_context/multi_threaded_spec.cr new file mode 100644 index 000000000000..96b2b9e23274 --- /dev/null +++ b/spec/std/fiber/execution_context/multi_threaded_spec.cr @@ -0,0 +1,42 @@ +{% skip_file unless flag?(:execution_context) %} +require "spec" + +describe Fiber::ExecutionContext::MultiThreaded do + it ".new" do + mt = Fiber::ExecutionContext::MultiThreaded.new("test", maximum: 2) + mt.size.should eq(0) + mt.capacity.should eq(2) + + expect_raises(ArgumentError, "needs at least one thread") do + Fiber::ExecutionContext::MultiThreaded.new("test", maximum: -1) + end + + expect_raises(ArgumentError, "needs at least one thread") do + Fiber::ExecutionContext::MultiThreaded.new("test", maximum: 0) + end + + mt = Fiber::ExecutionContext::MultiThreaded.new("test", size: 0..2) + mt.size.should eq(0) + mt.capacity.should eq(2) + + mt = Fiber::ExecutionContext::MultiThreaded.new("test", size: ..4) + mt.size.should eq(0) + mt.capacity.should eq(4) + + mt = Fiber::ExecutionContext::MultiThreaded.new("test", size: 1..5) + mt.size.should eq(1) + mt.capacity.should eq(5) + + mt = Fiber::ExecutionContext::MultiThreaded.new("test", size: 1...5) + mt.size.should eq(1) + mt.capacity.should eq(4) + + expect_raises(ArgumentError, "needs at least one thread") do + Fiber::ExecutionContext::MultiThreaded.new("test", size: 0...1) + end + + expect_raises(ArgumentError, "invalid range") do + Fiber::ExecutionContext::MultiThreaded.new("test", size: 5..1) + end + end +end diff --git a/spec/std/spec_helper.cr b/spec/std/spec_helper.cr index f0c03d05ece5..54bdbdf8b11f 100644 --- a/spec/std/spec_helper.cr +++ b/spec/std/spec_helper.cr @@ -43,7 +43,7 @@ def spawn_and_check(before : Proc(_), file = __FILE__, line = __LINE__, &block : # This is a workaround to ensure the "before" fiber # is unscheduled. Otherwise it might stay alive running the event loop - spawn(same_thread: true) do + spawn(same_thread: !{{flag?(:execution_context)}}) do while x.get != 2 Fiber.yield end diff --git a/spec/support/mt_abort_timeout.cr b/spec/support/mt_abort_timeout.cr index 7339da6c07a1..84c171ab605f 100644 --- a/spec/support/mt_abort_timeout.cr +++ b/spec/support/mt_abort_timeout.cr @@ -5,7 +5,7 @@ private SPEC_TIMEOUT = 15.seconds Spec.around_each do |example| done = Channel(Exception?).new - spawn(same_thread: true) do + spawn(same_thread: !{{flag?(:execution_context)}}) do begin example.run rescue e diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 9a9494ea05f3..26559d520357 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -179,6 +179,18 @@ class Thread Crystal::System::Thread.sleep(time) end + # Delays execution for a brief moment. + @[NoInline] + def self.delay(backoff : Int32) : Int32 + if backoff < 7 + backoff.times { Intrinsics.pause } + backoff &+ 1 + else + Thread.yield + 0 + end + end + # Returns the Thread object associated to the running system thread. def self.current : Thread Crystal::System::Thread.current_thread diff --git a/src/fiber/execution_context/multi_threaded.cr b/src/fiber/execution_context/multi_threaded.cr new file mode 100644 index 000000000000..04829ca67e4e --- /dev/null +++ b/src/fiber/execution_context/multi_threaded.cr @@ -0,0 +1,299 @@ +require "./global_queue" +require "./multi_threaded/scheduler" + +module Fiber::ExecutionContext + # MT scheduler. + # + # Owns multiple threads and starts a scheduler in each one. The number of + # threads is dynamic. Setting the minimum and maximum to the same value will + # start a fixed number of threads. + # + # Fully concurrent, fully parallel: fibers running in this context can be + # resumed by any thread in the context; fibers can run concurrently and in + # parallel to each other, in addition to running in parallel to every other + # fibers running in other contexts. + class MultiThreaded + include ExecutionContext + + getter name : String + + @mutex : Thread::Mutex + @condition : Thread::ConditionVariable + protected getter global_queue : GlobalQueue + + getter stack_pool : Fiber::StackPool = Fiber::StackPool.new + getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create + @event_loop_lock = Atomic(Bool).new(false) + + @parked = Atomic(Int32).new(0) + @spinning = Atomic(Int32).new(0) + @size : Range(Int32, Int32) + + # :nodoc: + protected def self.default(maximum : Int32) : self + new("DEFAULT", 1..maximum, hijack: true) + end + + # Starts a context with a *maximum* number of threads. Threads aren't started + # right away but will be started as needed to increase parallelism up to the + # configured maximum. + def self.new(name : String, maximum : Int32) : self + new(name, 0..maximum) + end + + # Starts a context with a *maximum* number of threads. Threads aren't started + # right away but will be started as needed to increase parallelism up to the + # configured maximum. + def self.new(name : String, size : Range(Nil, Int32)) : self + new(name, Range.new(0, size.end, size.exclusive?)) + end + + # Starts a context with a minimum and maximum number of threads. Only the + # minimum number of threads will be started right away. The minimum can be 0 + # (or nil) in which case no threads will be started. More threads will be + # started as needed to increase parallelism up to the configured maximum. + def self.new(name : String, size : Range(Int32, Int32)) : self + new(name, size, hijack: false) + end + + protected def initialize(@name : String, size : Range(Int32, Int32), hijack : Bool) + @size = + if size.exclusive? + (size.begin)..(size.end - 1) + else + size + end + raise ArgumentError.new("#{self.class.name} needs at least one thread") if capacity < 1 + raise ArgumentError.new("#{self.class.name} invalid range") if @size.begin > @size.end + + @mutex = Thread::Mutex.new + @condition = Thread::ConditionVariable.new + + @global_queue = GlobalQueue.new(@mutex) + @schedulers = Array(Scheduler).new(capacity) + @threads = Array(Thread).new(capacity) + + @rng = Random::PCG32.new + + start_schedulers + start_initial_threads(hijack) + + ExecutionContext.execution_contexts.push(self) + end + + # The number of threads that have been started. + def size : Int32 + @threads.size + end + + # The maximum number of threads that can be started. + def capacity : Int32 + @size.end + end + + def stack_pool? : Fiber::StackPool? + @stack_pool + end + + # Starts all schedulers at once. + # + # We could lazily initialize them as needed, like we do for threads, which + # would be safe as long as we only mutate when the mutex is locked... but + # unlike @threads, we do iterate the schedulers in #steal without locking + # the mutex (for obvious reasons) and there are no guarantees that the new + # schedulers.@size will be written after the scheduler has been written to + # the array's buffer. + # + # OPTIMIZE: consider storing schedulers to an array-like object that would + # use an atomic/fence to make sure that @size can only be incremented + # *after* the value has been written to @buffer. + private def start_schedulers + capacity.times do |index| + @schedulers << Scheduler.new(self, "#{@name}-#{index}") + end + end + + private def start_initial_threads(hijack) + offset = 0 + + if hijack + @threads << hijack_current_thread(@schedulers[0]) + offset += 1 + end + + offset.upto(@size.begin - 1) do |index| + @threads << start_thread(@schedulers[index]) + end + end + + # Attaches *scheduler* to the current `Thread`, usually the process' main + # thread. Starts a `Fiber` to run the scheduler loop. + private def hijack_current_thread(scheduler) : Thread + thread = Thread.current + thread.internal_name = scheduler.name + thread.execution_context = self + thread.scheduler = scheduler + + scheduler.thread = thread + scheduler.main_fiber = Fiber.new("#{scheduler.name}:loop", self) do + scheduler.run_loop + end + + thread + end + + # Starts a new `Thread` and attaches *scheduler*. Runs the scheduler loop + # directly in the thread's main `Fiber`. + private def start_thread(scheduler) : Thread + Thread.new(name: scheduler.name) do |thread| + thread.execution_context = self + thread.scheduler = scheduler + + scheduler.thread = thread + scheduler.main_fiber = thread.main_fiber + scheduler.main_fiber.name = "#{scheduler.name}:loop" + scheduler.run_loop + end + end + + def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread + self.spawn(name: name, &block) + end + + def enqueue(fiber : Fiber) : Nil + if ExecutionContext.current == self + # local enqueue: push to local queue of current scheduler + ExecutionContext::Scheduler.current.enqueue(fiber) + else + # cross context: push to global queue + Crystal.trace :sched, "enqueue", fiber: fiber, to_context: self + @global_queue.push(fiber) + wake_scheduler + end + end + + # Picks a scheduler at random then iterates all schedulers to try to steal + # fibers from. + protected def steal(& : Scheduler ->) : Nil + return if size == 1 + + i = @rng.next_int + n = @schedulers.size + + n.times do |j| + if scheduler = @schedulers[(i &+ j) % n]? + yield scheduler + end + end + end + + protected def park_thread(&) : Fiber? + @mutex.synchronize do + # avoid races by checking queues again + if fiber = yield + return fiber + end + + Crystal.trace :sched, "park" + @parked.add(1, :acquire_release) + + @condition.wait(@mutex) + + # we don't decrement @parked because #wake_scheduler did + Crystal.trace :sched, "wakeup" + end + + nil + end + + # This method always runs in parallel! + # + # This can be called from any thread in the context but can also be called + # from external execution contexts, in which case the context may have its + # last thread about to park itself, and we must prevent the last thread from + # parking when there is a parallel cross context enqueue! + # + # OPTIMIZE: instead of blindly spending time (blocking progress on the + # current thread) to unpark a thread / start a new thread we could move the + # responsibility to an external observer to increase parallelism in a MT + # context when it detects pending work. + protected def wake_scheduler : Nil + # another thread is spinning: nothing to do (it shall notice the enqueue) + if @spinning.get(:relaxed) > 0 + return + end + + # interrupt a thread waiting on the event loop + if @event_loop_lock.get(:relaxed) + @event_loop.interrupt + return + end + + # we can check @parked without locking the mutex because we can't push to + # the global queue _and_ park the thread at the same time, so either the + # thread is already parked (and we must awake it) or it noticed (or will + # notice) the fiber in the global queue; + # + # we still rely on an atomic to make sure the actual value is visible by + # the current thread + if @parked.get(:acquire) > 0 + @mutex.synchronize do + # avoid race conditions + return if @parked.get(:relaxed) == 0 + return if @spinning.get(:relaxed) > 0 + + # increase the number of spinning threads _now_ to avoid multiple + # threads from trying to wakeup multiple threads at the same time + # + # we must also decrement the number of parked threads because another + # thread could lock the mutex and increment @spinning again before the + # signaled thread is resumed + spinning = @spinning.add(1, :acquire_release) + parked = @parked.sub(1, :acquire_release) + + @condition.signal + end + return + end + + # check if we can start another thread; no need for atomics, the values + # shall be rather stable over time and we check them again inside the + # mutex + return if @threads.size == capacity + + @mutex.synchronize do + index = @threads.size + return if index == capacity # check again + + @threads << start_thread(@schedulers[index]) + end + end + + # Only one thread is allowed to run the event loop. Yields then returns true + # if the lock was acquired, otherwise returns false immediately. + protected def lock_evloop?(&) : Bool + if @event_loop_lock.swap(true, :acquire) == false + begin + yield + ensure + @event_loop_lock.set(false, :release) + end + true + else + false + end + end + + @[AlwaysInline] + def inspect(io : IO) : Nil + to_s(io) + end + + def to_s(io : IO) : Nil + io << "#<" << self.class.name << ":0x" + object_id.to_s(io, 16) + io << ' ' << name << '>' + end + end +end diff --git a/src/fiber/execution_context/multi_threaded/scheduler.cr b/src/fiber/execution_context/multi_threaded/scheduler.cr new file mode 100644 index 000000000000..5aadffaaa376 --- /dev/null +++ b/src/fiber/execution_context/multi_threaded/scheduler.cr @@ -0,0 +1,265 @@ +require "crystal/pointer_linked_list" +require "../scheduler" +require "../runnables" + +module Fiber::ExecutionContext + class MultiThreaded + # MT fiber scheduler. + # + # Owns a single thread inside a MT execution context. + class Scheduler + include ExecutionContext::Scheduler + + getter name : String + + # :nodoc: + property execution_context : MultiThreaded + protected property! thread : Thread + protected property! main_fiber : Fiber + + @global_queue : GlobalQueue + @runnables : Runnables(256) + @event_loop : Crystal::EventLoop + + @tick : Int32 = 0 + @spinning = false + @waiting = false + @parked = false + + protected def initialize(@execution_context, @name) + @global_queue = @execution_context.global_queue + @runnables = Runnables(256).new(@global_queue) + @event_loop = @execution_context.event_loop + end + + # :nodoc: + def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + raise RuntimeError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread + self.spawn(name: name, &block) + end + + # Unlike `ExecutionContext::MultiThreaded#enqueue` this method is only + # safe to call on `ExecutionContext.current` which should always be the + # case, since cross context enqueues must call + # `ExecutionContext::MultiThreaded#enqueue` through `Fiber#enqueue`. + protected def enqueue(fiber : Fiber) : Nil + Crystal.trace :sched, "enqueue", fiber: fiber + @runnables.push(fiber) + @execution_context.wake_scheduler unless @execution_context.capacity == 1 + end + + protected def reschedule : Nil + Crystal.trace :sched, "reschedule" + if fiber = quick_dequeue? + resume fiber unless fiber == thread.current_fiber + else + # nothing to do: switch back to the main loop to spin/wait/park + resume main_fiber + end + end + + protected def resume(fiber : Fiber) : Nil + Crystal.trace :sched, "resume", fiber: fiber + + # in a multithreaded environment the fiber may be dequeued before its + # running context has been saved on the stack (thread A tries to resume + # fiber but thread B didn't saved its context yet); we must wait until + # the context switch assembly saved all registers on the stack and set + # the fiber as resumable. + attempts = 0 + + until fiber.resumable? + if fiber.dead? + raise "BUG: tried to resume dead fiber #{fiber} (#{inspect})" + end + + # OPTIMIZE: if the thread saving the fiber context has been preempted, + # this will block the current thread from progressing... shall we + # abort and reenqueue the fiber after MAX attempts? + attempts = Thread.delay(attempts) + end + + swapcontext(fiber) + end + + private def quick_dequeue? : Fiber? + # every once in a while: dequeue from global queue to avoid two fibers + # constantly respawing each other to completely occupy the local queue + if (@tick &+= 1) % 61 == 0 + if fiber = @global_queue.pop? + return fiber + end + end + + # dequeue from local queue + if fiber = @runnables.shift? + return fiber + end + end + + protected def run_loop : Nil + Crystal.trace :sched, "started" + + loop do + if fiber = find_next_runnable + spin_stop if @spinning + resume fiber + else + # the event loop enqueued a fiber (or was interrupted) or the + # scheduler was unparked: go for the next iteration + end + rescue exception + Crystal.print_error_buffered("BUG: %s#run_loop [%s] crashed", + self.class.name, @name, exception: exception) + end + end + + private def find_next_runnable : Fiber? + find_next_runnable do |fiber| + return fiber if fiber + end + end + + private def find_next_runnable(&) : Nil + list = Fiber::List.new + + # nothing to do: start spinning + spinning do + yield @global_queue.grab?(@runnables, divisor: @execution_context.size) + + if @execution_context.lock_evloop? { @event_loop.run(pointerof(list), blocking: false) } + unless list.empty? + # must stop spinning before calling enqueue_many that may call + # wake_scheduler which returns immediately if a thread is + # spinning... but we're spinning, so that would always fail to + # wake sleeping schedulers despite having runnable fibers + spin_stop + yield enqueue_many(pointerof(list)) + end + end + + yield try_steal? + end + + # wait on the event loop for events and timers to activate + evloop_ran = @execution_context.lock_evloop? do + @waiting = true + + # there is a time window between stop spinning and start waiting + # during which another context may have enqueued a fiber, check again + # before blocking on the event loop to avoid missing a runnable fiber, + # which may block for a long time: + yield @global_queue.grab?(@runnables, divisor: @execution_context.size) + + # block on the event loop until an event is ready or the loop is + # interrupted + @event_loop.run(pointerof(list), blocking: true) + ensure + @waiting = false + end + + if evloop_ran + yield enqueue_many(pointerof(list)) + + # the event loop was interrupted: restart the loop + return + end + + # no runnable fiber and another thread is already running the event + # loop: park the thread until another scheduler or another context + # enqueues a fiber + @execution_context.park_thread do + # by the time we acquire the lock, another thread may have enqueued + # fiber(s) and already tried to wakeup a thread (race) so we must + # check again; we don't check the scheduler's local queue (it's empty) + + yield @global_queue.unsafe_grab?(@runnables, divisor: @execution_context.size) + yield try_steal? + + @parked = true + nil + end + @parked = false + + # immediately mark the scheduler as spinning (we just unparked); we + # don't increment the number of spinning threads since + # `MultiThreaded#wake_scheduler` already did + @spinning = true + end + + private def enqueue_many(list : Fiber::List*) : Fiber? + if fiber = list.value.pop? + Crystal.trace :sched, "enqueue", size: list.value.size, fiber: fiber + unless list.value.empty? + @runnables.bulk_push(list) + @execution_context.wake_scheduler unless @execution_context.capacity == 1 + end + fiber + end + end + + # This method always runs in parallel! + private def try_steal? : Fiber? + @execution_context.steal do |other| + if other == self + # no need to steal from ourselves + next + end + + if fiber = @runnables.steal_from(other.@runnables) + Crystal.trace :sched, "stole", from: other, size: @runnables.size, fiber: fiber + return fiber + end + end + end + + # OPTIMIZE: skip spinning if there are enough threads spinning already + private def spinning(&) + spin_start + + 4.times do |attempt| + Thread.yield unless attempt == 0 + yield + end + + spin_stop + end + + private def spin_start : Nil + return if @spinning + + @spinning = true + @execution_context.@spinning.add(1, :acquire_release) + end + + private def spin_stop : Nil + return unless @spinning + + @execution_context.@spinning.sub(1, :acquire_release) + @spinning = false + end + + def inspect(io : IO) : Nil + to_s(io) + end + + def to_s(io : IO) : Nil + io << "#<" << self.class.name << ":0x" + object_id.to_s(io, 16) + io << ' ' << @name << '>' + end + + def status : String + if @spinning + "spinning" + elsif @waiting + "event-loop" + elsif @parked + "parked" + else + "running" + end + end + end + end +end