Skip to content

Commit 8544bea

Browse files
committed
Add Fiber::Execution.syscall(&)
Marks the scheduler has running a blocking syscall. The monitor thread now ticks every 10ms to check if any scheduler in any ST or MT context is blocked on a syscall, and if so tries to detach the scheduler from the thread. On success the scheduler is moved to another thread, taken from the thread pool. The fiber doing a blocking syscall will still be blocked, but other fibers may be resumed by the scheduler. When the blocking syscall returns, the thread will try to unmark the scheduler as running a blocking syscall. Upon success it returns. Upon failure, it enqueues the current fiber back into its execution context, and checks itself back into the thread pool.
1 parent 3b91eac commit 8544bea

File tree

6 files changed

+152
-5
lines changed

6 files changed

+152
-5
lines changed

src/fiber.cr

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,15 @@ class Fiber
355355
{% end %}
356356
end
357357

358+
# :nodoc:
359+
def self.syscall(&)
360+
{% if flag?(:execution_context) %}
361+
ExecutionContext::Scheduler.current.syscall { yield }
362+
{% else %}
363+
yield
364+
{% end %}
365+
end
366+
358367
def to_s(io : IO) : Nil
359368
io << "#<" << self.class.name << ":0x"
360369
object_id.to_s(io, 16)

src/fiber/execution_context/isolated.cr

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,11 +233,27 @@ module Fiber::ExecutionContext
233233
def status : String
234234
if @waiting
235235
"event-loop"
236+
elsif @syscall == SYSCALL_FLAG
237+
"syscall"
236238
elsif @running
237239
"running"
238240
else
239241
"shutdown"
240242
end
241243
end
244+
245+
protected def enter_syscall : UInt32
246+
@syscall.lazy_set(SYSCALL_FLAG)
247+
end
248+
249+
protected def leave_syscall?(value : UInt32) : Bool
250+
@syscall.lazy_set(0_u32)
251+
true
252+
end
253+
254+
# :nodoc:
255+
def syscall(& : -> U) : U forall U
256+
yield
257+
end
242258
end
243259
end

src/fiber/execution_context/monitor.cr

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,25 @@
11
module Fiber::ExecutionContext
22
# :nodoc:
33
class Monitor
4-
DEFAULT_EVERY = 5.seconds
4+
# :nodoc:
5+
struct Timer
6+
def initialize(@every : Time::Span)
7+
@last = Time.monotonic
8+
end
9+
10+
def elapsed?(now : Time::Span) : Bool
11+
ret = @last + @every <= now
12+
@last = now if ret
13+
ret
14+
end
15+
end
16+
17+
DEFAULT_EVERY = 10.milliseconds
518

619
@thread : Thread?
720

821
def initialize(@every = DEFAULT_EVERY)
22+
@stack_collect_timer = Timer.new(5.seconds)
923
@thread = Thread.new(name: "SYSMON") { run_loop }
1024
end
1125

@@ -35,7 +49,8 @@ module Fiber::ExecutionContext
3549
# OS.
3650
private def run_loop : Nil
3751
every do |now|
38-
collect_stacks
52+
transfer_schedulers_blocked_on_syscall
53+
collect_stacks if @stack_collect_timer.elapsed?(now)
3954
end
4055
end
4156

@@ -55,6 +70,30 @@ module Fiber::ExecutionContext
5570
end
5671
end
5772

73+
# Iterates each ExecutionContext::Scheduler and transfers the Scheduler for
74+
# any Thread currently blocked on a syscall.
75+
#
76+
# OPTIMIZE: a scheduler in a MT context might not need to be transferred if
77+
# its queue is empty and another scheduler in the context is blocked on the
78+
# event loop.
79+
private def transfer_schedulers_blocked_on_syscall : Nil
80+
ExecutionContext.each do |execution_context|
81+
if execution_context.responds_to?(:each_scheduler)
82+
execution_context.each_scheduler do |scheduler|
83+
next unless scheduler.detach_syscall?
84+
85+
Crystal.trace :sched, "reassociate",
86+
scheduler: scheduler,
87+
syscall: scheduler.thread.current_fiber
88+
89+
pool = ExecutionContext.thread_pool
90+
pool.detach(scheduler.thread)
91+
pool.checkout(scheduler)
92+
end
93+
end
94+
end
95+
end
96+
5897
# Iterates each execution context and collects unused fiber stacks.
5998
#
6099
# OPTIMIZE: should maybe happen during GC collections (?)

src/fiber/execution_context/multi_threaded/scheduler.cr

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ module Fiber::ExecutionContext
126126

127127
# nothing to do: start spinning
128128
spinning do
129+
# usually empty but the scheduler may have been transferred to another
130+
# thread with queued fibers
131+
yield @runnables.shift?
132+
129133
yield @global_queue.grab?(@runnables, divisor: @execution_context.size)
130134

131135
if @execution_context.lock_evloop? { @event_loop.run(pointerof(list), blocking: false) }
@@ -257,6 +261,8 @@ module Fiber::ExecutionContext
257261
"event-loop"
258262
elsif @parked
259263
"parked"
264+
elsif @syscall.get(:relaxed).bits_set?(SYSCALL_FLAG)
265+
"syscall"
260266
else
261267
"running"
262268
end

src/fiber/execution_context/scheduler.cr

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ module Fiber::ExecutionContext
66
Thread.current.scheduler
77
end
88

9+
@[AlwaysInline]
10+
def self.current? : Scheduler?
11+
Thread.current.scheduler?
12+
end
13+
914
protected abstract def thread : Thread
1015
protected abstract def execution_context : ExecutionContext
1116

@@ -68,5 +73,57 @@ module Fiber::ExecutionContext
6873
# Returns the current status of the scheduler. For example `"running"`,
6974
# `"event-loop"` or `"parked"`.
7075
abstract def status : String
76+
77+
# :nodoc:
78+
SYSCALL_FLAG = 1_u32
79+
80+
# :nodoc:
81+
# use to increment the counter to avoid ABA issues
82+
SYSCALL_INCREMENT = 2_u32
83+
84+
@syscall = Atomic(UInt32).new(0_u32)
85+
86+
# :nodoc:
87+
#
88+
# Marks the current scheduler as doing a syscall that may block the current
89+
# thread for an unknown time. This allows the monitor thread to move the
90+
# scheduler to another thread, without waiting for the syscall to complete.
91+
#
92+
# When the syscall eventually completes, the thread will check if the
93+
# scheduler has been detached and either continue running the current fiber if
94+
# the scheduler is still attached, or enqueue the current fiber back into
95+
# its execution context, then returns itself back into the thread pool.
96+
def syscall(& : -> U) : U forall U
97+
scheduler = Scheduler.current
98+
fiber = Fiber.current
99+
value = scheduler.enter_syscall
100+
101+
ret = yield
102+
103+
if !scheduler.leave_syscall?(value)
104+
# the scheduler has been detached: enqueue the current fiber back into
105+
# its execution context, and return the thread back into the thread pool
106+
scheduler.execution_context.enqueue(fiber)
107+
ExecutionContext.thread_pool.checkin
108+
end
109+
110+
ret
111+
end
112+
113+
protected def enter_syscall : UInt32
114+
old_value = @syscall.add(SYSCALL_FLAG | SYSCALL_INCREMENT, :acquire_release)
115+
old_value += SYSCALL_FLAG | SYSCALL_INCREMENT
116+
end
117+
118+
protected def leave_syscall?(value : UInt32) : Bool
119+
new_value = (value & ~SYSCALL_FLAG) &+ SYSCALL_INCREMENT
120+
_, success = @syscall.compare_and_set(value, new_value, :acquire_release, :relaxed)
121+
success
122+
end
123+
124+
protected def detach_syscall? : Bool
125+
value = @syscall.get(:relaxed)
126+
value.bits_set?(SYSCALL_FLAG) && leave_syscall?(value)
127+
end
71128
end
72129
end

src/fiber/execution_context/single_threaded.cr

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ module Fiber::ExecutionContext
9191
ExecutionContext.thread_pool.checkout(self)
9292
end
9393

94+
protected def each_scheduler(& : Scheduler ->) : Nil
95+
yield self.as(Scheduler)
96+
end
97+
9498
# :nodoc:
9599
def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber
96100
# whatever the value of same_thread: the fibers will always run on the
@@ -122,14 +126,24 @@ module Fiber::ExecutionContext
122126
end
123127
end
124128

129+
# FIXME: duplicates MultiThreaded::Scheduler#resume
125130
protected def resume(fiber : Fiber) : Nil
126-
unless fiber.resumable?
131+
Crystal.trace :sched, "resume", fiber: fiber
132+
133+
# fibers should always be ready to be resumed in the ST environment,
134+
# unless SYSMON moved the scheduler to another thread while the fiber was
135+
# blocked on a syscall; upon return the original thread might have
136+
# enqueued the fiber, but hasn't swapcontext while this thread already
137+
# tries to resume it
138+
attempts = 0
139+
140+
until fiber.resumable?
127141
if fiber.dead?
128142
raise "BUG: tried to resume dead fiber #{fiber} (#{inspect})"
129-
else
130-
raise "BUG: can't resume running fiber #{fiber} (#{inspect})"
131143
end
144+
attempts = Thread.delay(attempts)
132145
end
146+
133147
swapcontext(fiber)
134148
end
135149

@@ -186,6 +200,10 @@ module Fiber::ExecutionContext
186200

187201
# nothing to do: start spinning
188202
spinning do
203+
# usually empty but the scheduler may have been transferred to another
204+
# thread with queued fibers
205+
yield @runnables.shift?
206+
189207
yield @global_queue.grab?(@runnables, divisor: 1)
190208

191209
@event_loop.run(pointerof(list), blocking: false)
@@ -272,6 +290,8 @@ module Fiber::ExecutionContext
272290
"spinning"
273291
elsif @waiting.get(:relaxed)
274292
"event-loop"
293+
elsif @syscall.get(:relaxed).bits_set?(SYSCALL_FLAG)
294+
"syscall"
275295
else
276296
"running"
277297
end

0 commit comments

Comments
 (0)