Skip to content

Commit 603c2ff

Browse files
committed
Fix: MT context can't keep list of threads
1 parent d29fa58 commit 603c2ff

File tree

2 files changed

+20
-26
lines changed

2 files changed

+20
-26
lines changed

src/fiber/execution_context/multi_threaded.cr

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,7 @@ module Fiber::ExecutionContext
9090

9191
@global_queue = GlobalQueue.new(@mutex)
9292
@schedulers = Array(Scheduler).new(capacity)
93-
94-
# FIXME: invalid since schedulers will be transfered to other threads,
95-
# keep a mere `@size : Int32` counter instead!
96-
@threads = Array(Thread).new(capacity)
97-
93+
@started = @size.begin
9894
@rng = Random::PCG32.new
9995

10096
start_schedulers
@@ -105,7 +101,7 @@ module Fiber::ExecutionContext
105101

106102
# The number of threads that have been started.
107103
def size : Int32
108-
@threads.size
104+
@started
109105
end
110106

111107
# The maximum number of threads that can be started.
@@ -120,16 +116,15 @@ module Fiber::ExecutionContext
120116

121117
# Starts all schedulers at once.
122118
#
123-
# We could lazily initialize them as needed, like we do for threads, which
124-
# would be safe as long as we only mutate when the mutex is locked... but
125-
# unlike @threads, we do iterate the schedulers in #steal without locking
126-
# the mutex (for obvious reasons) and there are no guarantees that the new
127-
# schedulers.@size will be written after the scheduler has been written to
128-
# the array's buffer.
119+
# We could lazily initialize them as needed, would be safe as long as we
120+
# only mutate when the mutex is locked, but we iterate the schedulers in
121+
# #steal without locking the mutex (for obvious reasons) and there are no
122+
# guarantees that the new schedulers.@size will be written after the
123+
# scheduler has been written to the array's buffer.
129124
#
130125
# OPTIMIZE: consider storing schedulers to an array-like object that would
131-
# use an atomic/fence to make sure that @size can only be incremented
132-
# *after* the value has been written to @buffer.
126+
# use an atomic/fence to make sure that @size is only incremented *after*
127+
# the value has been written to @buffer.
133128
private def start_schedulers
134129
capacity.times do |index|
135130
@schedulers << Scheduler.new(self, "#{@name}-#{index}")
@@ -140,18 +135,18 @@ module Fiber::ExecutionContext
140135
offset = 0
141136

142137
if hijack
143-
@threads << hijack_current_thread(@schedulers[0])
138+
hijack_current_thread(@schedulers[0])
144139
offset += 1
145140
end
146141

147142
offset.upto(@size.begin - 1) do |index|
148-
@threads << start_thread(@schedulers[index])
143+
start_thread(@schedulers[index])
149144
end
150145
end
151146

152147
# Attaches *scheduler* to the current `Thread`, usually the process' main
153148
# thread. Starts a `Fiber` to run the scheduler loop.
154-
private def hijack_current_thread(scheduler) : Thread
149+
private def hijack_current_thread(scheduler) : Nil
155150
thread = Thread.current
156151
thread.internal_name = scheduler.name
157152
thread.execution_context = self
@@ -161,7 +156,7 @@ module Fiber::ExecutionContext
161156

162157
# Starts a new `Thread` and attaches *scheduler*. Runs the scheduler loop
163158
# directly in the thread's main `Fiber`.
164-
private def start_thread(scheduler) : Thread
159+
private def start_thread(scheduler) : Nil
165160
ExecutionContext.thread_pool.checkout(scheduler)
166161
end
167162

@@ -181,7 +176,7 @@ module Fiber::ExecutionContext
181176
# local enqueue: push to local queue of current scheduler
182177
ExecutionContext::Scheduler.current.enqueue(fiber)
183178
else
184-
# cross context: push to global queue
179+
# cross context or detached thread: push to global queue
185180
Crystal.trace :sched, "enqueue", fiber: fiber, to_context: self
186181
@global_queue.push(fiber)
187182
wake_scheduler
@@ -213,8 +208,8 @@ module Fiber::ExecutionContext
213208
Crystal.trace :sched, "park"
214209
@parked.add(1, :acquire_release)
215210

216-
# TODO: consider detaching the scheduler and returning the thread back
217-
# into ExecutionContext.thread_pool instead
211+
# TODO: detach the scheduler and return the thread back into ThreadPool
212+
# instead
218213
@condition.wait(@mutex)
219214

220215
# we don't decrement @parked because #wake_scheduler did
@@ -277,13 +272,12 @@ module Fiber::ExecutionContext
277272
# check if we can start another thread; no need for atomics, the values
278273
# shall be rather stable over time and we check them again inside the
279274
# mutex
280-
return if @threads.size == capacity
275+
return if @started == capacity
281276

282277
@mutex.synchronize do
283-
index = @threads.size
278+
index = @started
284279
return if index == capacity # check again
285-
286-
@threads << start_thread(@schedulers[index])
280+
start_thread(@schedulers[index])
287281
end
288282
end
289283

src/fiber/execution_context/single_threaded.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ module Fiber::ExecutionContext
109109
Crystal.trace :sched, "enqueue", fiber: fiber
110110
@runnables.push(fiber)
111111
else
112-
# cross context enqueue
112+
# cross context or detached thread enqueue
113113
Crystal.trace :sched, "enqueue", fiber: fiber, to_context: self
114114
@global_queue.push(fiber)
115115
wake_scheduler

0 commit comments

Comments
 (0)