diff --git a/src/concurrent.cr b/src/concurrent.cr index 07ae945a84f6..53cbff1d3c67 100644 --- a/src/concurrent.cr +++ b/src/concurrent.cr @@ -1,8 +1,13 @@ require "fiber" require "channel" -require "crystal/scheduler" require "crystal/tracing" +{% if flag?(:execution_context) %} + require "fiber/execution_context" +{% else %} + require "crystal/scheduler" +{% end %} + # Blocks the current fiber for the specified number of seconds. # # While this fiber is waiting this time, other ready-to-execute @@ -12,8 +17,7 @@ def sleep(seconds : Number) : Nil if seconds < 0 raise ArgumentError.new "Sleep seconds must be positive" end - - Crystal::Scheduler.sleep(seconds.seconds) + sleep(seconds.seconds) end # Blocks the current Fiber for the specified time span. @@ -21,16 +25,28 @@ end # While this fiber is waiting this time, other ready-to-execute # fibers might start their execution. def sleep(time : Time::Span) : Nil - Crystal::Scheduler.sleep(time) + Crystal.trace :sched, "sleep", for: time + + {% if flag?(:execution_context) %} + Fiber.current.resume_event.add(time) + Fiber::ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.sleep(time) + {% end %} end # Blocks the current fiber forever. # # Meanwhile, other ready-to-execute fibers might start their execution. def sleep : Nil - Crystal::Scheduler.reschedule + {% if flag?(:execution_context) %} + Fiber::ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.reschedule + {% end %} end +{% begin %} # Spawns a new fiber. # # NOTE: The newly created fiber doesn't run as soon as spawned. @@ -64,12 +80,17 @@ end # wg.wait # ``` def spawn(*, name : String? = nil, same_thread = false, &block) - fiber = Fiber.new(name, &block) - Crystal.trace :sched, "spawn", fiber: fiber - {% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %} - fiber.enqueue - fiber + {% if flag?(:execution_context) %} + Fiber::ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block) + {% else %} + fiber = Fiber.new(name, &block) + Crystal.trace :sched, "spawn", fiber: fiber + {% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %} + fiber.enqueue + fiber + {% end %} end +{% end %} # Spawns a fiber by first creating a `Proc`, passing the *call*'s # expressions to it, and letting the `Proc` finally invoke the *call*. diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index 00bcb86040b6..e4f7d4d577be 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -27,12 +27,20 @@ abstract class Crystal::EventLoop @[AlwaysInline] def self.current : self - Crystal::Scheduler.event_loop + {% if flag?(:execution_context) %} + Fiber::ExecutionContext.current.event_loop + {% else %} + Crystal::Scheduler.event_loop + {% end %} end @[AlwaysInline] - def self.current? : self? - Crystal::Scheduler.event_loop? + def self.current? : self | Nil + {% if flag?(:execution_context) %} + Fiber::ExecutionContext.current.event_loop + {% else %} + Crystal::Scheduler.event_loop? + {% end %} end # Runs the loop. @@ -46,6 +54,13 @@ abstract class Crystal::EventLoop # events. abstract def run(blocking : Bool) : Bool + {% if flag?(:execution_context) %} + # Same as `#run` but collects runnable fibers into *queue* instead of + # enqueueing in parallel, so the caller is responsible and in control for + # when and how the fibers will be enqueued. + abstract def run(queue : Fiber::List*, blocking : Bool) : Nil + {% end %} + # Tells a blocking run loop to no longer wait for events to activate. It may # for example enqueue a NOOP event with an immediate (or past) timeout. Having # activated an event, the loop shall return, allowing the blocked thread to diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index da827079312a..16f74bd5859e 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -55,6 +55,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop iocp end + # thread unsafe def run(blocking : Bool) : Bool enqueued = false @@ -66,6 +67,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop enqueued end + {% if flag?(:execution_context) %} + # thread unsafe + def run(queue : Fiber::List*, blocking : Bool) : Nil + run_impl(blocking) { |fiber| queue.value.push(fiber) } + end + {% end %} + # Runs the event loop and enqueues the fiber for the next upcoming event or # completion. private def run_impl(blocking : Bool, &) : Nil diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index 636d01331624..5dc5abf4f42e 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -20,26 +20,55 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop event_base.loop(flags) end + {% if flag?(:execution_context) %} + def run(queue : Fiber::List*, blocking : Bool) : Nil + Crystal.trace :evloop, "run", fiber: fiber, blocking: blocking + @runnables = queue + run(blocking) + ensure + @runnables = nil + end + + def callback_enqueue(fiber : Fiber) : Nil + if queue = @runnables + queue.value.push(fiber) + else + raise "BUG: libevent callback executed outside of #run(queue*, blocking) call" + end + end + {% end %} + def interrupt : Nil event_base.loop_exit end - # Create a new resume event for a fiber. + # Create a new resume event for a fiber (sleep). def create_resume_event(fiber : Fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| - data.as(Fiber).enqueue + f = data.as(Fiber) + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} end end - # Creates a timeout_event. + # Creates a timeout event (timeout action of select expression). def create_timeout_event(fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| f = data.as(Fiber) - if (select_action = f.timeout_select_action) + if select_action = f.timeout_select_action f.timeout_select_action = nil - select_action.time_expired(f) - else - f.enqueue + if select_action.time_expired? + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} + end end end end diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 4df9eff7bc8e..fad5e2f64375 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -112,14 +112,25 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop end {% end %} - # NOTE: thread unsafe + # thread unsafe def run(blocking : Bool) : Bool system_run(blocking) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end true end + {% if flag?(:execution_context) %} + # thread unsafe + def run(queue : Fiber::List*, blocking : Bool) : Nil + system_run(blocking) { |fiber| queue.value.push(fiber) } + end + {% end %} + # fiber interface, see Crystal::EventLoop def create_resume_event(fiber : Fiber) : FiberEvent @@ -327,13 +338,21 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop Polling.arena.free(index) do |pd| pd.value.@readers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end) end pd.value.@writers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end) end diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index 51494fa2944b..ca84f1c86665 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -1,3 +1,5 @@ +{% skip_file if flag?(:execution_context) %} + require "crystal/event_loop" require "crystal/system/print_error" require "fiber" @@ -66,7 +68,6 @@ class Crystal::Scheduler end def self.sleep(time : Time::Span) : Nil - Crystal.trace :sched, "sleep", for: time Thread.current.scheduler.sleep(time) end diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 878a27e4c578..9a9494ea05f3 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -79,6 +79,47 @@ class Thread getter name : String? + {% if flag?(:execution_context) %} + # :nodoc: + getter! execution_context : Fiber::ExecutionContext + + # :nodoc: + property! scheduler : Fiber::ExecutionContext::Scheduler + + # :nodoc: + def execution_context=(@execution_context : Fiber::ExecutionContext) : Fiber::ExecutionContext + main_fiber.execution_context = execution_context + end + + # When a fiber terminates we can't release its stack until we swap context + # to another fiber. We can't free/unmap nor push it to a shared stack pool, + # that would result in a segfault. + @dead_fiber_stack : Fiber::Stack? + + # :nodoc: + def dying_fiber(fiber : Fiber) : Fiber::Stack? + stack = @dead_fiber_stack + @dead_fiber_stack = fiber.@stack + stack + end + + # :nodoc: + def dead_fiber_stack? : Fiber::Stack? + if stack = @dead_fiber_stack + @dead_fiber_stack = nil + stack + end + end + {% else %} + # :nodoc: + getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) } + + # :nodoc: + def scheduler? : ::Crystal::Scheduler? + @scheduler + end + {% end %} + def self.unsafe_each(&) # nothing to iterate when @@threads is nil + don't lazily allocate in a # method called from a GC collection callback! @@ -165,14 +206,6 @@ class Thread thread.name = name end - # :nodoc: - getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) } - - # :nodoc: - def scheduler? : ::Crystal::Scheduler? - @scheduler - end - protected def start Thread.threads.push(self) Thread.current = self diff --git a/src/crystal/tracing.cr b/src/crystal/tracing.cr index d9508eda85a8..ab88f841e67f 100644 --- a/src/crystal/tracing.cr +++ b/src/crystal/tracing.cr @@ -81,6 +81,16 @@ module Crystal write value.name || '?' end + {% if flag?(:execution_context) %} + def write(value : Fiber::ExecutionContext) : Nil + write value.name + end + + def write(value : Fiber::ExecutionContext::Scheduler) : Nil + write value.name + end + {% end %} + def write(value : Pointer) : Nil write "0x" System.to_int_slice(value.address, 16, true, 2) { |bytes| write(bytes) } diff --git a/src/fiber.cr b/src/fiber.cr index a7282047e165..0e72b1236075 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -67,7 +67,10 @@ class Fiber property name : String? @alive = true - {% if flag?(:preview_mt) %} @current_thread = Atomic(Thread?).new(nil) {% end %} + + {% if flag?(:preview_mt) && !flag?(:execution_context) %} + @current_thread = Atomic(Thread?).new(nil) + {% end %} # :nodoc: property next : Fiber? @@ -75,6 +78,10 @@ class Fiber # :nodoc: property previous : Fiber? + {% if flag?(:execution_context) %} + property! execution_context : ExecutionContext + {% end %} + # :nodoc: def self.inactive(fiber : Fiber) fibers.delete(fiber) @@ -92,24 +99,27 @@ class Fiber fibers.each { |fiber| yield fiber } end + {% begin %} # Creates a new `Fiber` instance. # # When the fiber is executed, it runs *proc* in its context. # # *name* is an optional and used only as an internal reference. - def self.new(name : String? = nil, &proc : ->) + def self.new(name : String? = nil, {% if flag?(:execution_context) %}execution_context : ExecutionContext = ExecutionContext.current,{% end %} &proc : ->) : self stack = {% if flag?(:interpreted) %} # the interpreter is managing the stacks Stack.new(Pointer(Void).null, Pointer(Void).null) + {% elsif flag?(:execution_context) %} + execution_context.stack_pool.checkout {% else %} Crystal::Scheduler.stack_pool.checkout {% end %} - new(name, stack, &proc) + new(name, stack, {% if flag?(:execution_context) %}execution_context,{% end %} &proc) end # :nodoc: - def initialize(@name : String?, @stack : Stack, &@proc : ->) + def initialize(@name : String?, @stack : Stack, {% if flag?(:execution_context) %}@execution_context : ExecutionContext = ExecutionContext.current,{% end %} &@proc : ->) @context = Context.new fiber_main = ->(f : Fiber) { f.run } @@ -118,6 +128,7 @@ class Fiber Fiber.fibers.push(self) end + {% end %} # :nodoc: def initialize(stack : Void*, thread) @@ -137,13 +148,22 @@ class Fiber @stack = Stack.new(stack, stack_bottom) @name = "main" - {% if flag?(:preview_mt) %} @current_thread.set(thread) {% end %} + + {% if flag?(:preview_mt) && !flag?(:execution_context) %} + @current_thread.set(thread) + {% end %} + Fiber.fibers.push(self) + + # we don't initialize @execution_context here (we may not have an execution + # context yet), and we can't detect ExecutionContext.current (we may reach + # an infinite recursion). end # :nodoc: def run GC.unlock_read + @proc.call rescue ex if name = @name @@ -165,9 +185,21 @@ class Fiber @exec_recursive_clone_hash = nil @alive = false + + # the interpreter is managing the stacks {% unless flag?(:interpreted) %} - Crystal::Scheduler.stack_pool.release(@stack) + {% if flag?(:execution_context) %} + # do not prematurely release the stack before we switch to another fiber + if stack = Thread.current.dying_fiber(self) + # we can however release the stack of a previously dying fiber (we + # since swapped context) + execution_context.stack_pool.release(stack) + end + {% else %} + Crystal::Scheduler.stack_pool.release(@stack) + {% end %} {% end %} + Fiber.suspend end @@ -209,7 +241,11 @@ class Fiber # puts "never reached" # ``` def resume : Nil - Crystal::Scheduler.resume(self) + {% if flag?(:execution_context) %} + ExecutionContext.resume(self) + {% else %} + Crystal::Scheduler.resume(self) + {% end %} end # Adds this fiber to the scheduler's runnables queue for the current thread. @@ -218,7 +254,11 @@ class Fiber # the next time it has the opportunity to reschedule to another fiber. There # are no guarantees when that will happen. def enqueue : Nil - Crystal::Scheduler.enqueue(self) + {% if flag?(:execution_context) %} + execution_context.enqueue(self) + {% else %} + Crystal::Scheduler.enqueue(self) + {% end %} end # :nodoc: @@ -286,7 +326,14 @@ class Fiber # end # ``` def self.yield : Nil - Crystal::Scheduler.yield + Crystal.trace :sched, "yield" + + {% if flag?(:execution_context) %} + Fiber.current.resume_event.add(0.seconds) + Fiber.suspend + {% else %} + Crystal::Scheduler.yield + {% end %} end # Suspends execution of the current fiber indefinitely. @@ -300,7 +347,11 @@ class Fiber # useful if the fiber needs to wait for something to happen (for example an IO # event, a message is ready in a channel, etc.) which triggers a re-enqueue. def self.suspend : Nil - Crystal::Scheduler.reschedule + {% if flag?(:execution_context) %} + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.reschedule + {% end %} end def to_s(io : IO) : Nil @@ -322,7 +373,7 @@ class Fiber GC.push_stack @context.stack_top, @stack.bottom end - {% if flag?(:preview_mt) %} + {% if flag?(:preview_mt) && !flag?(:execution_context) %} # :nodoc: def set_current_thread(thread = Thread.current) : Thread @current_thread.set(thread) diff --git a/src/fiber/execution_context.cr b/src/fiber/execution_context.cr new file mode 100644 index 000000000000..0f9eb4d7bcd9 --- /dev/null +++ b/src/fiber/execution_context.cr @@ -0,0 +1,117 @@ +require "../crystal/event_loop" +require "../crystal/system/thread" +require "../crystal/system/thread_linked_list" +require "../fiber" +require "./stack_pool" +require "./execution_context/scheduler" + +{% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %} +{% raise "ERROR: execution contexts require the `execution_context` compilation flag" unless flag?(:execution_context) %} + +module Fiber::ExecutionContext + @@default : ExecutionContext? + + # Returns the default `ExecutionContext` for the process, automatically + # started when the program started. + @[AlwaysInline] + def self.default : ExecutionContext + @@default.not_nil!("expected default execution context to have been setup") + end + + # :nodoc: + def self.init_default_context : Nil + raise NotImplementedError.new("No execution context implementations (yet)") + end + + # Returns the default number of workers to start in the execution context. + def self.default_workers_count : Int32 + ENV["CRYSTAL_WORKERS"]?.try(&.to_i?) || Math.min(System.cpu_count.to_i, 32) + end + + # :nodoc: + protected class_getter(execution_contexts) { Thread::LinkedList(ExecutionContext).new } + + # :nodoc: + property next : ExecutionContext? + + # :nodoc: + property previous : ExecutionContext? + + # :nodoc: + def self.unsafe_each(&) : Nil + @@execution_contexts.try(&.unsafe_each { |execution_context| yield execution_context }) + end + + # Iterates all execution contexts. + def self.each(&) : Nil + execution_contexts.each { |execution_context| yield execution_context } + end + + # Returns the `ExecutionContext` the current fiber is running in. + @[AlwaysInline] + def self.current : ExecutionContext + Thread.current.execution_context + end + + # :nodoc: + # + # Tells the current scheduler to suspend the current fiber and resume the + # next runnable fiber. The current fiber will never be resumed; you're + # responsible to reenqueue it. + # + # This method is safe as it only operates on the current `ExecutionContext` + # and `Scheduler`. + @[AlwaysInline] + def self.reschedule : Nil + Scheduler.current.reschedule + end + + # :nodoc: + # + # Tells the current scheduler to suspend the current fiber and to resume + # *fiber* instead. The current fiber will never be resumed; you're responsible + # to reenqueue it. + # + # Raises `RuntimeError` if the fiber doesn't belong to the current execution + # context. + # + # This method is safe as it only operates on the current `ExecutionContext` + # and `Scheduler`. + def self.resume(fiber : Fiber) : Nil + if fiber.execution_context == current + Scheduler.current.resume(fiber) + else + raise RuntimeError.new("Can't resume fiber from #{fiber.execution_context} into #{current}") + end + end + + # Creates a new fiber then calls `#enqueue` to add it to the execution + # context. + # + # May be called from any `ExecutionContext` (i.e. must be thread-safe). + def spawn(*, name : String? = nil, &block : ->) : Fiber + Fiber.new(name, self, &block).tap { |fiber| enqueue(fiber) } + end + + # :nodoc: + # + # Legacy support for the `same_thread` argument. Each execution context may + # decide to support it or not (e.g. a single threaded context can accept it). + abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + + # :nodoc: + abstract def stack_pool : Fiber::StackPool + + # :nodoc: + abstract def stack_pool? : Fiber::StackPool? + + # :nodoc: + abstract def event_loop : Crystal::EventLoop + + # :nodoc: + # + # Enqueues a fiber to be resumed inside the execution context. + # + # May be called from any ExecutionContext (i.e. must be thread-safe). + abstract def enqueue(fiber : Fiber) : Nil +end diff --git a/src/fiber/execution_context/scheduler.cr b/src/fiber/execution_context/scheduler.cr new file mode 100644 index 000000000000..c561fcf7e2ea --- /dev/null +++ b/src/fiber/execution_context/scheduler.cr @@ -0,0 +1,72 @@ +module Fiber::ExecutionContext + # :nodoc: + module Scheduler + @[AlwaysInline] + def self.current : Scheduler + Thread.current.scheduler + end + + protected abstract def thread : Thread + protected abstract def execution_context : ExecutionContext + + # Instantiates a fiber and enqueues it into the scheduler's local queue. + def spawn(*, name : String? = nil, &block : ->) : Fiber + fiber = Fiber.new(name, execution_context, &block) + enqueue(fiber) + fiber + end + + # Legacy support for the *same_thread* argument. Each execution context may + # decide to support it or not (e.g. a single threaded context can accept it). + abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + + # Suspends the current fiber and resumes *fiber* instead. + # The current fiber will never be resumed; you're responsible to reenqueue it. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.enqueue` instead. + protected abstract def enqueue(fiber : Fiber) : Nil + + # Suspends the execution of the current fiber and resumes the next runnable + # fiber. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.reschedule` instead. + protected abstract def reschedule : Nil + + # Suspends the execution of the current fiber and resumes *fiber*. + # + # The current fiber will never be resumed; you're responsible to reenqueue + # it. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.resume` instead. + protected abstract def resume(fiber : Fiber) : Nil + + # Switches the thread from running the current fiber to run *fiber* instead. + # + # Handles thread safety around fiber stacks by locking the GC, so it won't + # start a GC collection while we're switching context. + # + # Unsafe. Must only be called by the current scheduler. The caller must + # ensure that the fiber indeed belongs to the current execution context and + # that the fiber can indeed be resumed. + # + # WARNING: after switching context you can't trust *self* anymore (it is the + # scheduler that resumed *fiber* which may not be the one that suspended + # *fiber*) or instance variables; local variables however are the ones from + # before swapping context. + protected def swapcontext(fiber : Fiber) : Nil + current_fiber = thread.current_fiber + + GC.lock_read + thread.current_fiber = fiber + Fiber.swapcontext(pointerof(current_fiber.@context), pointerof(fiber.@context)) + GC.unlock_read + end + + # Returns the current status of the scheduler. For example `"running"`, + # `"event-loop"` or `"parked"`. + abstract def status : String + end +end diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr index 04954de40a94..4a36f47f4abd 100644 --- a/src/fiber/stack_pool.cr +++ b/src/fiber/stack_pool.cr @@ -5,6 +5,11 @@ class Fiber class StackPool STACK_SIZE = 8 * 1024 * 1024 + {% if flag?(:execution_context) %} + # must explicitly declare the variable because of the macro in #initialize + @lock = uninitialized Crystal::SpinLock + {% end %} + # If *protect* is true, guards all top pages (pages with the lowest address # values) in the allocated stacks; accessing them triggers an error # condition, allowing stack overflows on non-main fibers to be detected. @@ -13,6 +18,10 @@ class Fiber # pointer value) rather than downwards, so *protect* must be false. def initialize(@protect : Bool = true) @deque = Deque(Stack).new + + {% if flag?(:execution_context) %} + @lock = Crystal::SpinLock.new + {% end %} end def finalize @@ -25,7 +34,7 @@ class Fiber # returning memory to the operating system. def collect(count = lazy_size // 2) : Nil count.times do - if stack = @deque.shift? + if stack = shift? Crystal::System::Fiber.free_stack(stack.pointer, STACK_SIZE) else return @@ -42,7 +51,7 @@ class Fiber # Removes a stack from the bottom of the pool, or allocates a new one. def checkout : Stack - if stack = @deque.pop? + if stack = pop? Crystal::System::Fiber.reset_stack(stack.pointer, STACK_SIZE, @protect) stack else @@ -53,7 +62,13 @@ class Fiber # Appends a stack to the bottom of the pool. def release(stack : Stack) : Nil - @deque.push(stack) if stack.reusable? + return unless stack.reusable? + + {% if flag?(:execution_context) %} + @lock.sync { @deque.push(stack) } + {% else %} + @deque.push(stack) + {% end %} end # Returns the approximated size of the pool. It may be equal or slightly @@ -61,5 +76,25 @@ class Fiber def lazy_size : Int32 @deque.size end + + private def shift? + {% if flag?(:execution_context) %} + @lock.sync { @deque.shift? } unless @deque.empty? + {% else %} + @deque.shift? + {% end %} + end + + private def pop? + {% if flag?(:execution_context) %} + if (stack = Thread.current.dead_fiber_stack?) && stack.reusable? + stack + else + @lock.sync { @deque.pop? } unless @deque.empty? + end + {% else %} + @deque.pop? + {% end %} + end end end diff --git a/src/io/evented.cr b/src/io/evented.cr index 635c399d9239..8eefa5ed35bc 100644 --- a/src/io/evented.cr +++ b/src/io/evented.cr @@ -20,7 +20,12 @@ module IO::Evented @read_timed_out = timed_out if reader = @readers.get?.try &.shift? - reader.enqueue + {% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(reader) + {% else %} + reader.enqueue + {% end %} end end @@ -29,7 +34,12 @@ module IO::Evented @write_timed_out = timed_out if writer = @writers.get?.try &.shift? - writer.enqueue + {% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(reader) + {% else %} + writer.enqueue + {% end %} end end @@ -79,11 +89,19 @@ module IO::Evented @write_event.consume_each &.free @readers.consume_each do |readers| - Crystal::Scheduler.enqueue readers + {% if flag?(:execution_context) %} + readers.each { |fiber| fiber.execution_context.enqueue fiber } + {% else %} + Crystal::Scheduler.enqueue readers + {% end %} end @writers.consume_each do |writers| - Crystal::Scheduler.enqueue writers + {% if flag?(:execution_context) %} + writers.each { |fiber| fiber.execution_context.enqueue fiber } + {% else %} + Crystal::Scheduler.enqueue writers + {% end %} end end diff --git a/src/kernel.cr b/src/kernel.cr index 34763b994839..054705c55e5b 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -608,7 +608,11 @@ end Exception::CallStack.load_debug_info if ENV["CRYSTAL_LOAD_DEBUG_INFO"]? == "1" Exception::CallStack.setup_crash_handler - Crystal::Scheduler.init + {% if flag?(:execution_context) %} + Fiber::ExecutionContext.init_default_context + {% else %} + Crystal::Scheduler.init + {% end %} {% if flag?(:win32) %} Crystal::System::Process.start_interrupt_loop diff --git a/src/raise.cr b/src/raise.cr index 1ba0243def28..7ebb18f1320e 100644 --- a/src/raise.cr +++ b/src/raise.cr @@ -1,5 +1,6 @@ require "c/stdio" require "c/stdlib" +require "crystal/system/print_error" require "exception/call_stack" Exception::CallStack.skip(__FILE__)