Skip to content

RFC: Refactor Crystal::EventLoop to disconnect it from LibEvent #10766

Closed
@lbguilherme

Description

@lbguilherme

LibEvent is all about detecting when a file/socket is ready to be read or written without blocking. A non-blocking read from a socket is thus a sequence of two steps: first wait for it to be readable, then read from it without blocking. If the system happens to return a EWOULDBLOCK error, just try waiting for it to be readable again. This is the reason IO::Evented exists. For example:

def evented_read(slice : Bytes, errno_msg : String) : Int32
  loop do
    bytes_read = yield slice
    if bytes_read != -1
      # `to_i32` is acceptable because `Slice#size` is an Int32
      return bytes_read.to_i32
    end

    if Errno.value == Errno::EAGAIN
      wait_readable
    else
      raise IO::Error.from_errno(errno_msg)
    end
  end
ensure
  resume_pending_readers
end

The problem is that not all async IO work like that. Windows' Overlapped IO works by submitting a "read request" to the system and them waiting for an event when it completes in a single step. Linux's io_uring is very similar.

It is not viable to implement efficient async IO on Windows or on Linux's io_uring on top of the current IO::Evented interface. Please refer to these comments by ysbaddaden and RX14 as well: #8651.


I propose changing Crystal::EventLoops interface to this:

module Crystal::EventLoop
  # Runs the event loop.
  def self.run_once : Nil
  end

  {% unless flag?(:preview_mt) %}
    # Reinitializes the event loop after a fork.
    def self.after_fork : Nil
    end
  {% end %}

  # All methods below will block the current fiber until the operation is complete.
  # run_once is responsible for enqueueing fibers that are ready to return.

  # Blocks the current fiber for `time`.
  def self.sleep(time : Time::Span) : Nil
  end

  # Reads at least one byte from a file or pipe
  def self.read(fd, slice : Bytes) : Int32
  end

  # Writes at least one byte to a file or pipe
  def self.write(fd, slice : Bytes) : Int32
  end

  # Reads at least one byte from a socket
  def self.receive(fd, slice : Bytes) : Int32
  end

  # Reads at least one byte from a socket, obtaining the source address of the packet (UDP)
  def self.receive_from(fd, slice : Bytes) : {Int32, ::Socket::Address}
  end

  # Writes at least one byte to a socket
  def self.send(fd, slice : Bytes) : Int32
  end

  # Writes at least one byte to a socket with a target address (UDP)
  def self.send_to(fd, slice : Bytes, address : ::Socket::Address) : Int32
  end

  # Accepts a incomming TCP connection
  def self.accept(fd) : Int32
  end

  # Opens a connection
  def self.connect(fd, address : ::Socket::Address) : Int32
  end

  # Closes a file
  def self.close(fd) : Int32
  end
  
  # Synchronizes data and metadata of a file to persistent storage
  def self.fsync(fd) : Int32
  end

  # Synchronizes data of a file to persistent storage
  def self.fdatasync(fd) : Int32
  end

  # Waits until a file is ready for a non-blocking read operation
  def self.wait_readable(fd) : Nil
  end
  
  # Waits until a file is ready for a non-blocking write operation
  def self.wait_writable(fd) : Nil
  end
end

All of these can be implemented for Unix either by blocking or by using wait_readable/wait_writable and calling into LibC when ready. This interface allows other async implementations to exist without changes to files outside Crystal::System.

I believe this makes the Windows implementation cleaner as well. What do you think @straight-shoota?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions