Description
LibEvent
is all about detecting when a file/socket is ready to be read or written without blocking. A non-blocking read from a socket is thus a sequence of two steps: first wait for it to be readable, then read from it without blocking. If the system happens to return a EWOULDBLOCK
error, just try waiting for it to be readable again. This is the reason IO::Evented
exists. For example:
def evented_read(slice : Bytes, errno_msg : String) : Int32
loop do
bytes_read = yield slice
if bytes_read != -1
# `to_i32` is acceptable because `Slice#size` is an Int32
return bytes_read.to_i32
end
if Errno.value == Errno::EAGAIN
wait_readable
else
raise IO::Error.from_errno(errno_msg)
end
end
ensure
resume_pending_readers
end
The problem is that not all async IO work like that. Windows' Overlapped IO works by submitting a "read request" to the system and them waiting for an event when it completes in a single step. Linux's io_uring is very similar.
It is not viable to implement efficient async IO on Windows or on Linux's io_uring on top of the current IO::Evented
interface. Please refer to these comments by ysbaddaden and RX14 as well: #8651.
I propose changing Crystal::EventLoop
s interface to this:
module Crystal::EventLoop
# Runs the event loop.
def self.run_once : Nil
end
{% unless flag?(:preview_mt) %}
# Reinitializes the event loop after a fork.
def self.after_fork : Nil
end
{% end %}
# All methods below will block the current fiber until the operation is complete.
# run_once is responsible for enqueueing fibers that are ready to return.
# Blocks the current fiber for `time`.
def self.sleep(time : Time::Span) : Nil
end
# Reads at least one byte from a file or pipe
def self.read(fd, slice : Bytes) : Int32
end
# Writes at least one byte to a file or pipe
def self.write(fd, slice : Bytes) : Int32
end
# Reads at least one byte from a socket
def self.receive(fd, slice : Bytes) : Int32
end
# Reads at least one byte from a socket, obtaining the source address of the packet (UDP)
def self.receive_from(fd, slice : Bytes) : {Int32, ::Socket::Address}
end
# Writes at least one byte to a socket
def self.send(fd, slice : Bytes) : Int32
end
# Writes at least one byte to a socket with a target address (UDP)
def self.send_to(fd, slice : Bytes, address : ::Socket::Address) : Int32
end
# Accepts a incomming TCP connection
def self.accept(fd) : Int32
end
# Opens a connection
def self.connect(fd, address : ::Socket::Address) : Int32
end
# Closes a file
def self.close(fd) : Int32
end
# Synchronizes data and metadata of a file to persistent storage
def self.fsync(fd) : Int32
end
# Synchronizes data of a file to persistent storage
def self.fdatasync(fd) : Int32
end
# Waits until a file is ready for a non-blocking read operation
def self.wait_readable(fd) : Nil
end
# Waits until a file is ready for a non-blocking write operation
def self.wait_writable(fd) : Nil
end
end
All of these can be implemented for Unix either by blocking or by using wait_readable
/wait_writable
and calling into LibC when ready. This interface allows other async implementations to exist without changes to files outside Crystal::System
.
I believe this makes the Windows implementation cleaner as well. What do you think @straight-shoota?