Skip to content

Commit 09b4fbb

Browse files
Add Fiber::ExecutionContext::SingleThreaded scheduler (#15511)
Introduces the first EC scheduler that runs in a single thread. Uses the same queues (`Runnables`, `GlobalQueue`) as the multithreaded scheduler that will come next. The `Runnables` local queue could be simplified (no parallel accesses, hence no need for atomics) at the expense of duplicating the implementation. The scheduler doesn't need to actively park the thread; the event loops always block (when told to) even when there are no events, which acts as parking the thread. NOTE: we can start running the specs in this context (they pass locally), though there might be some issues until we replace rogue `Thread` instances with bubble wrapped `ExecutionContext::Isolated` (coming in another PR). If you want to try: ```console $ make std_spec FLAGS="-Dpreview_mt -Dexecution_context" $ make compiler_spec FLAGS="-Dpreview_mt -Dexecution_context" ``` Co-authored-by: Johannes Müller <[email protected]>
1 parent eec49b9 commit 09b4fbb

File tree

3 files changed

+278
-4
lines changed

3 files changed

+278
-4
lines changed

spec/std/thread_spec.cr

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,13 @@ pending_interpreted describe: Thread do
4444
end
4545

4646
it "names the thread" do
47-
Thread.current.name.should be_nil
48-
name = nil
47+
{% if flag?(:execution_context) %}
48+
Thread.current.name.should eq("DEFAULT")
49+
{% else %}
50+
Thread.current.name.should be_nil
51+
{% end %}
4952

53+
name = nil
5054
thread = Thread.new(name: "some-name") do
5155
name = Thread.current.name
5256
end

src/fiber/execution_context.cr

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ require "../crystal/system/thread"
33
require "../crystal/system/thread_linked_list"
44
require "../fiber"
55
require "./stack_pool"
6-
require "./execution_context/scheduler"
6+
require "./execution_context/*"
77

88
{% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %}
99
{% raise "ERROR: execution contexts require the `execution_context` compilation flag" unless flag?(:execution_context) %}
@@ -20,7 +20,7 @@ module Fiber::ExecutionContext
2020

2121
# :nodoc:
2222
def self.init_default_context : Nil
23-
raise NotImplementedError.new("No execution context implementations (yet)")
23+
@@default = SingleThreaded.default
2424
end
2525

2626
# Returns the default number of workers to start in the execution context.
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
require "./global_queue"
2+
require "./runnables"
3+
require "./scheduler"
4+
5+
module Fiber::ExecutionContext
6+
# ST scheduler. Owns a single thread.
7+
#
8+
# Fully concurrent with limited parallelism: concurrency is restricted to this
9+
# single thread. Fibers running in this context will never run in parallel to
10+
# each other but they may still run in parallel to fibers running in other
11+
# contexts (i.e. another thread).
12+
class SingleThreaded
13+
include ExecutionContext
14+
include ExecutionContext::Scheduler
15+
16+
getter name : String
17+
18+
protected getter thread : Thread
19+
@main_fiber : Fiber
20+
21+
@mutex : Thread::Mutex
22+
@global_queue : GlobalQueue
23+
@runnables : Runnables(256)
24+
25+
getter stack_pool : Fiber::StackPool = Fiber::StackPool.new
26+
getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create
27+
28+
@waiting = Atomic(Bool).new(false)
29+
@parked = Atomic(Bool).new(false)
30+
@spinning = Atomic(Bool).new(false)
31+
@tick : Int32 = 0
32+
33+
# :nodoc:
34+
protected def self.default : self
35+
new("DEFAULT", hijack: true)
36+
end
37+
38+
def self.new(name : String) : self
39+
new(name, hijack: false)
40+
end
41+
42+
protected def initialize(@name : String, hijack : Bool)
43+
@mutex = Thread::Mutex.new
44+
@global_queue = GlobalQueue.new(@mutex)
45+
@runnables = Runnables(256).new(@global_queue)
46+
47+
@thread = uninitialized Thread
48+
@main_fiber = uninitialized Fiber
49+
@thread = hijack ? hijack_current_thread : start_thread
50+
51+
ExecutionContext.execution_contexts.push(self)
52+
end
53+
54+
# :nodoc:
55+
def execution_context : self
56+
self
57+
end
58+
59+
def stack_pool? : Fiber::StackPool?
60+
@stack_pool
61+
end
62+
63+
# Initializes the scheduler on the current thread (usually the process'
64+
# main thread).
65+
private def hijack_current_thread : Thread
66+
thread = Thread.current
67+
thread.internal_name = @name
68+
thread.execution_context = self
69+
thread.scheduler = self
70+
@main_fiber = Fiber.new("#{@name}:loop", self) { run_loop }
71+
thread
72+
end
73+
74+
# Creates a new thread to initialize the scheduler.
75+
private def start_thread : Thread
76+
Thread.new(name: @name) do |thread|
77+
thread.execution_context = self
78+
thread.scheduler = self
79+
@main_fiber = thread.main_fiber
80+
@main_fiber.name = "#{@name}:loop"
81+
run_loop
82+
end
83+
end
84+
85+
# :nodoc:
86+
def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber
87+
# whatever the value of same_thread: the fibers will always run on the
88+
# same thread
89+
self.spawn(name: name, &block)
90+
end
91+
92+
def enqueue(fiber : Fiber) : Nil
93+
if ExecutionContext.current == self
94+
# local enqueue
95+
Crystal.trace :sched, "enqueue", fiber: fiber
96+
@runnables.push(fiber)
97+
else
98+
# cross context enqueue
99+
Crystal.trace :sched, "enqueue", fiber: fiber, to_context: self
100+
@global_queue.push(fiber)
101+
wake_scheduler
102+
end
103+
end
104+
105+
protected def reschedule : Nil
106+
Crystal.trace :sched, "reschedule"
107+
if fiber = quick_dequeue?
108+
resume fiber unless fiber == @thread.current_fiber
109+
else
110+
# nothing to do: switch back to the main loop to spin/park
111+
resume @main_fiber
112+
end
113+
end
114+
115+
protected def resume(fiber : Fiber) : Nil
116+
unless fiber.resumable?
117+
if fiber.dead?
118+
raise "BUG: tried to resume dead fiber #{fiber} (#{inspect})"
119+
else
120+
raise "BUG: can't resume running fiber #{fiber} (#{inspect})"
121+
end
122+
end
123+
swapcontext(fiber)
124+
end
125+
126+
private def quick_dequeue? : Fiber?
127+
# every once in a while: dequeue from global queue to avoid two fibers
128+
# constantly respawing each other to completely occupy the local queue
129+
if (@tick &+= 1) % 61 == 0
130+
if fiber = @global_queue.pop?
131+
return fiber
132+
end
133+
end
134+
135+
# try local queue
136+
if fiber = @runnables.shift?
137+
return fiber
138+
end
139+
140+
# try to refill local queue
141+
if fiber = @global_queue.grab?(@runnables, divisor: 1)
142+
return fiber
143+
end
144+
145+
# run the event loop to see if any event is activable
146+
list = Fiber::List.new
147+
@event_loop.run(pointerof(list), blocking: false)
148+
return enqueue_many(pointerof(list))
149+
end
150+
151+
private def run_loop : Nil
152+
Crystal.trace :sched, "started"
153+
154+
loop do
155+
if fiber = find_next_runnable
156+
spin_stop if @spinning.get(:relaxed)
157+
resume fiber
158+
else
159+
# the event loop enqueued a fiber (or was interrupted) or the
160+
# scheduler was unparked: go for the next iteration
161+
end
162+
rescue exception
163+
Crystal.print_error_buffered("BUG: %s#run_loop [%s] crashed",
164+
self.class.name, @name, exception: exception)
165+
end
166+
end
167+
168+
private def find_next_runnable : Fiber?
169+
find_next_runnable do |fiber|
170+
return fiber if fiber
171+
end
172+
end
173+
174+
private def find_next_runnable(&) : Nil
175+
list = Fiber::List.new
176+
177+
# nothing to do: start spinning
178+
spinning do
179+
yield @global_queue.grab?(@runnables, divisor: 1)
180+
181+
@event_loop.run(pointerof(list), blocking: false)
182+
yield enqueue_many(pointerof(list))
183+
end
184+
185+
# block on the event loop, waiting for pending event(s) to activate
186+
waiting do
187+
# there is a time window between stop spinning and start waiting during
188+
# which another context can enqueue a fiber, check again before waiting
189+
# on the event loop to avoid missing a runnable fiber
190+
yield @global_queue.grab?(@runnables, divisor: 1)
191+
192+
@event_loop.run(pointerof(list), blocking: true)
193+
yield enqueue_many(pointerof(list))
194+
195+
# the event loop was interrupted: restart the run loop
196+
return
197+
end
198+
end
199+
200+
private def enqueue_many(list : Fiber::List*) : Fiber?
201+
if fiber = list.value.pop?
202+
@runnables.bulk_push(list) unless list.value.empty?
203+
fiber
204+
end
205+
end
206+
207+
private def spinning(&)
208+
spin_start
209+
210+
4.times do |attempt|
211+
Thread.yield unless attempt == 0
212+
yield
213+
end
214+
215+
spin_stop
216+
end
217+
218+
private def spin_start : Nil
219+
@spinning.set(true, :release)
220+
end
221+
222+
private def spin_stop : Nil
223+
@spinning.set(false, :release)
224+
end
225+
226+
private def waiting(&)
227+
@waiting.set(true, :release)
228+
begin
229+
yield
230+
ensure
231+
@waiting.set(false, :release)
232+
end
233+
end
234+
235+
# This method runs in parallel to the rest of the ST scheduler!
236+
#
237+
# This is called from another context _after_ enqueueing into the global
238+
# queue to try and wakeup the ST thread running in parallel that may be
239+
# running, spinning or waiting on the event loop.
240+
private def wake_scheduler : Nil
241+
if @spinning.get(:acquire)
242+
return
243+
end
244+
245+
if @waiting.get(:acquire)
246+
@event_loop.interrupt
247+
end
248+
end
249+
250+
def inspect(io : IO) : Nil
251+
to_s(io)
252+
end
253+
254+
def to_s(io : IO) : Nil
255+
io << "#<" << self.class.name << ":0x"
256+
object_id.to_s(io, 16)
257+
io << ' ' << name << '>'
258+
end
259+
260+
def status : String
261+
if @spinning.get(:relaxed)
262+
"spinning"
263+
elsif @waiting.get(:relaxed)
264+
"event-loop"
265+
else
266+
"running"
267+
end
268+
end
269+
end
270+
end

0 commit comments

Comments
 (0)