Skip to content

Commit b83439c

Browse files
authored
RFC 2: Add Fiber::ExecutionContext::MultiThreaded (#15517)
Introduces an execution context that can run multiple threads with work stealing, so any thread can resume any runnable fiber in the context (no more starving threads). Unlike the ST scheduler, the MT scheduler needs to actively park threads in addition to waiting on the event loop, since only one thread in the context can run the event loop (no parallel runs). Having a single event loop for the whole context instead of having one per thread avoids situations where fibers would wait in an event loop but won't be processed because this thread happens to be busy, causing delays. With a single event loop, as soon as a thread is starving it can check the event loop and enqueue runnable fibers, that can be immediately resumed (and stolen). NOTE: we can start running the specs in this context though they can segfault sometimes. Maybe because of some issues in spec helpers that used to expect fibers not switching, or maybe of issues in the stdlib for the same reason (for example libxml).
1 parent ff083d0 commit b83439c

File tree

6 files changed

+620
-2
lines changed

6 files changed

+620
-2
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{% skip_file unless flag?(:execution_context) %}
2+
require "spec"
3+
4+
describe Fiber::ExecutionContext::MultiThreaded do
5+
it ".new" do
6+
mt = Fiber::ExecutionContext::MultiThreaded.new("test", maximum: 2)
7+
mt.size.should eq(0)
8+
mt.capacity.should eq(2)
9+
10+
expect_raises(ArgumentError, "needs at least one thread") do
11+
Fiber::ExecutionContext::MultiThreaded.new("test", maximum: -1)
12+
end
13+
14+
expect_raises(ArgumentError, "needs at least one thread") do
15+
Fiber::ExecutionContext::MultiThreaded.new("test", maximum: 0)
16+
end
17+
18+
mt = Fiber::ExecutionContext::MultiThreaded.new("test", size: 0..2)
19+
mt.size.should eq(0)
20+
mt.capacity.should eq(2)
21+
22+
mt = Fiber::ExecutionContext::MultiThreaded.new("test", size: ..4)
23+
mt.size.should eq(0)
24+
mt.capacity.should eq(4)
25+
26+
mt = Fiber::ExecutionContext::MultiThreaded.new("test", size: 1..5)
27+
mt.size.should eq(1)
28+
mt.capacity.should eq(5)
29+
30+
mt = Fiber::ExecutionContext::MultiThreaded.new("test", size: 1...5)
31+
mt.size.should eq(1)
32+
mt.capacity.should eq(4)
33+
34+
expect_raises(ArgumentError, "needs at least one thread") do
35+
Fiber::ExecutionContext::MultiThreaded.new("test", size: 0...1)
36+
end
37+
38+
expect_raises(ArgumentError, "invalid range") do
39+
Fiber::ExecutionContext::MultiThreaded.new("test", size: 5..1)
40+
end
41+
end
42+
end

spec/std/spec_helper.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def spawn_and_check(before : Proc(_), file = __FILE__, line = __LINE__, &block :
4343

4444
# This is a workaround to ensure the "before" fiber
4545
# is unscheduled. Otherwise it might stay alive running the event loop
46-
spawn(same_thread: true) do
46+
spawn(same_thread: !{{flag?(:execution_context)}}) do
4747
while x.get != 2
4848
Fiber.yield
4949
end

spec/support/mt_abort_timeout.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ private SPEC_TIMEOUT = 15.seconds
55
Spec.around_each do |example|
66
done = Channel(Exception?).new
77

8-
spawn(same_thread: true) do
8+
spawn(same_thread: !{{flag?(:execution_context)}}) do
99
begin
1010
example.run
1111
rescue e

src/crystal/system/thread.cr

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,18 @@ class Thread
179179
Crystal::System::Thread.sleep(time)
180180
end
181181

182+
# Delays execution for a brief moment.
183+
@[NoInline]
184+
def self.delay(backoff : Int32) : Int32
185+
if backoff < 7
186+
backoff.times { Intrinsics.pause }
187+
backoff &+ 1
188+
else
189+
Thread.yield
190+
0
191+
end
192+
end
193+
182194
# Returns the Thread object associated to the running system thread.
183195
def self.current : Thread
184196
Crystal::System::Thread.current_thread
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
require "./global_queue"
2+
require "./multi_threaded/scheduler"
3+
4+
module Fiber::ExecutionContext
5+
# MT scheduler.
6+
#
7+
# Owns multiple threads and starts a scheduler in each one. The number of
8+
# threads is dynamic. Setting the minimum and maximum to the same value will
9+
# start a fixed number of threads.
10+
#
11+
# Fully concurrent, fully parallel: fibers running in this context can be
12+
# resumed by any thread in the context; fibers can run concurrently and in
13+
# parallel to each other, in addition to running in parallel to every other
14+
# fibers running in other contexts.
15+
class MultiThreaded
16+
include ExecutionContext
17+
18+
getter name : String
19+
20+
@mutex : Thread::Mutex
21+
@condition : Thread::ConditionVariable
22+
protected getter global_queue : GlobalQueue
23+
24+
getter stack_pool : Fiber::StackPool = Fiber::StackPool.new
25+
getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create
26+
@event_loop_lock = Atomic(Bool).new(false)
27+
28+
@parked = Atomic(Int32).new(0)
29+
@spinning = Atomic(Int32).new(0)
30+
@size : Range(Int32, Int32)
31+
32+
# :nodoc:
33+
protected def self.default(maximum : Int32) : self
34+
new("DEFAULT", 1..maximum, hijack: true)
35+
end
36+
37+
# Starts a context with a *maximum* number of threads. Threads aren't started
38+
# right away but will be started as needed to increase parallelism up to the
39+
# configured maximum.
40+
def self.new(name : String, maximum : Int32) : self
41+
new(name, 0..maximum)
42+
end
43+
44+
# Starts a context with a *maximum* number of threads. Threads aren't started
45+
# right away but will be started as needed to increase parallelism up to the
46+
# configured maximum.
47+
def self.new(name : String, size : Range(Nil, Int32)) : self
48+
new(name, Range.new(0, size.end, size.exclusive?))
49+
end
50+
51+
# Starts a context with a minimum and maximum number of threads. Only the
52+
# minimum number of threads will be started right away. The minimum can be 0
53+
# (or nil) in which case no threads will be started. More threads will be
54+
# started as needed to increase parallelism up to the configured maximum.
55+
def self.new(name : String, size : Range(Int32, Int32)) : self
56+
new(name, size, hijack: false)
57+
end
58+
59+
protected def initialize(@name : String, size : Range(Int32, Int32), hijack : Bool)
60+
@size =
61+
if size.exclusive?
62+
(size.begin)..(size.end - 1)
63+
else
64+
size
65+
end
66+
raise ArgumentError.new("#{self.class.name} needs at least one thread") if capacity < 1
67+
raise ArgumentError.new("#{self.class.name} invalid range") if @size.begin > @size.end
68+
69+
@mutex = Thread::Mutex.new
70+
@condition = Thread::ConditionVariable.new
71+
72+
@global_queue = GlobalQueue.new(@mutex)
73+
@schedulers = Array(Scheduler).new(capacity)
74+
@threads = Array(Thread).new(capacity)
75+
76+
@rng = Random::PCG32.new
77+
78+
start_schedulers
79+
start_initial_threads(hijack)
80+
81+
ExecutionContext.execution_contexts.push(self)
82+
end
83+
84+
# The number of threads that have been started.
85+
def size : Int32
86+
@threads.size
87+
end
88+
89+
# The maximum number of threads that can be started.
90+
def capacity : Int32
91+
@size.end
92+
end
93+
94+
def stack_pool? : Fiber::StackPool?
95+
@stack_pool
96+
end
97+
98+
# Starts all schedulers at once.
99+
#
100+
# We could lazily initialize them as needed, like we do for threads, which
101+
# would be safe as long as we only mutate when the mutex is locked... but
102+
# unlike @threads, we do iterate the schedulers in #steal without locking
103+
# the mutex (for obvious reasons) and there are no guarantees that the new
104+
# schedulers.@size will be written after the scheduler has been written to
105+
# the array's buffer.
106+
#
107+
# OPTIMIZE: consider storing schedulers to an array-like object that would
108+
# use an atomic/fence to make sure that @size can only be incremented
109+
# *after* the value has been written to @buffer.
110+
private def start_schedulers
111+
capacity.times do |index|
112+
@schedulers << Scheduler.new(self, "#{@name}-#{index}")
113+
end
114+
end
115+
116+
private def start_initial_threads(hijack)
117+
offset = 0
118+
119+
if hijack
120+
@threads << hijack_current_thread(@schedulers[0])
121+
offset += 1
122+
end
123+
124+
offset.upto(@size.begin - 1) do |index|
125+
@threads << start_thread(@schedulers[index])
126+
end
127+
end
128+
129+
# Attaches *scheduler* to the current `Thread`, usually the process' main
130+
# thread. Starts a `Fiber` to run the scheduler loop.
131+
private def hijack_current_thread(scheduler) : Thread
132+
thread = Thread.current
133+
thread.internal_name = scheduler.name
134+
thread.execution_context = self
135+
thread.scheduler = scheduler
136+
137+
scheduler.thread = thread
138+
scheduler.main_fiber = Fiber.new("#{scheduler.name}:loop", self) do
139+
scheduler.run_loop
140+
end
141+
142+
thread
143+
end
144+
145+
# Starts a new `Thread` and attaches *scheduler*. Runs the scheduler loop
146+
# directly in the thread's main `Fiber`.
147+
private def start_thread(scheduler) : Thread
148+
Thread.new(name: scheduler.name) do |thread|
149+
thread.execution_context = self
150+
thread.scheduler = scheduler
151+
152+
scheduler.thread = thread
153+
scheduler.main_fiber = thread.main_fiber
154+
scheduler.main_fiber.name = "#{scheduler.name}:loop"
155+
scheduler.run_loop
156+
end
157+
end
158+
159+
def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber
160+
raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread
161+
self.spawn(name: name, &block)
162+
end
163+
164+
def enqueue(fiber : Fiber) : Nil
165+
if ExecutionContext.current == self
166+
# local enqueue: push to local queue of current scheduler
167+
ExecutionContext::Scheduler.current.enqueue(fiber)
168+
else
169+
# cross context: push to global queue
170+
Crystal.trace :sched, "enqueue", fiber: fiber, to_context: self
171+
@global_queue.push(fiber)
172+
wake_scheduler
173+
end
174+
end
175+
176+
# Picks a scheduler at random then iterates all schedulers to try to steal
177+
# fibers from.
178+
protected def steal(& : Scheduler ->) : Nil
179+
return if size == 1
180+
181+
i = @rng.next_int
182+
n = @schedulers.size
183+
184+
n.times do |j|
185+
if scheduler = @schedulers[(i &+ j) % n]?
186+
yield scheduler
187+
end
188+
end
189+
end
190+
191+
protected def park_thread(&) : Fiber?
192+
@mutex.synchronize do
193+
# avoid races by checking queues again
194+
if fiber = yield
195+
return fiber
196+
end
197+
198+
Crystal.trace :sched, "park"
199+
@parked.add(1, :acquire_release)
200+
201+
@condition.wait(@mutex)
202+
203+
# we don't decrement @parked because #wake_scheduler did
204+
Crystal.trace :sched, "wakeup"
205+
end
206+
207+
nil
208+
end
209+
210+
# This method always runs in parallel!
211+
#
212+
# This can be called from any thread in the context but can also be called
213+
# from external execution contexts, in which case the context may have its
214+
# last thread about to park itself, and we must prevent the last thread from
215+
# parking when there is a parallel cross context enqueue!
216+
#
217+
# OPTIMIZE: instead of blindly spending time (blocking progress on the
218+
# current thread) to unpark a thread / start a new thread we could move the
219+
# responsibility to an external observer to increase parallelism in a MT
220+
# context when it detects pending work.
221+
protected def wake_scheduler : Nil
222+
# another thread is spinning: nothing to do (it shall notice the enqueue)
223+
if @spinning.get(:relaxed) > 0
224+
return
225+
end
226+
227+
# interrupt a thread waiting on the event loop
228+
if @event_loop_lock.get(:relaxed)
229+
@event_loop.interrupt
230+
return
231+
end
232+
233+
# we can check @parked without locking the mutex because we can't push to
234+
# the global queue _and_ park the thread at the same time, so either the
235+
# thread is already parked (and we must awake it) or it noticed (or will
236+
# notice) the fiber in the global queue;
237+
#
238+
# we still rely on an atomic to make sure the actual value is visible by
239+
# the current thread
240+
if @parked.get(:acquire) > 0
241+
@mutex.synchronize do
242+
# avoid race conditions
243+
return if @parked.get(:relaxed) == 0
244+
return if @spinning.get(:relaxed) > 0
245+
246+
# increase the number of spinning threads _now_ to avoid multiple
247+
# threads from trying to wakeup multiple threads at the same time
248+
#
249+
# we must also decrement the number of parked threads because another
250+
# thread could lock the mutex and increment @spinning again before the
251+
# signaled thread is resumed
252+
spinning = @spinning.add(1, :acquire_release)
253+
parked = @parked.sub(1, :acquire_release)
254+
255+
@condition.signal
256+
end
257+
return
258+
end
259+
260+
# check if we can start another thread; no need for atomics, the values
261+
# shall be rather stable over time and we check them again inside the
262+
# mutex
263+
return if @threads.size == capacity
264+
265+
@mutex.synchronize do
266+
index = @threads.size
267+
return if index == capacity # check again
268+
269+
@threads << start_thread(@schedulers[index])
270+
end
271+
end
272+
273+
# Only one thread is allowed to run the event loop. Yields then returns true
274+
# if the lock was acquired, otherwise returns false immediately.
275+
protected def lock_evloop?(&) : Bool
276+
if @event_loop_lock.swap(true, :acquire) == false
277+
begin
278+
yield
279+
ensure
280+
@event_loop_lock.set(false, :release)
281+
end
282+
true
283+
else
284+
false
285+
end
286+
end
287+
288+
@[AlwaysInline]
289+
def inspect(io : IO) : Nil
290+
to_s(io)
291+
end
292+
293+
def to_s(io : IO) : Nil
294+
io << "#<" << self.class.name << ":0x"
295+
object_id.to_s(io, 16)
296+
io << ' ' << name << '>'
297+
end
298+
end
299+
end

0 commit comments

Comments
 (0)