Skip to content

Support asynchronous file I/O on Windows #14321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
6 changes: 3 additions & 3 deletions spec/std/file/tempfile_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ describe Crystal::System::File do
fd, path = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14]))
path.should eq Path[tempdir, "A789abcdeZ"].to_s
ensure
File.from_fd(path, fd).close if fd && path
IO::FileDescriptor.new(fd).close if fd
end
end

Expand All @@ -212,7 +212,7 @@ describe Crystal::System::File do
fd, path = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]))
path.should eq File.join(tempdir, "AfghijklmZ")
ensure
File.from_fd(path, fd).close if fd && path
IO::FileDescriptor.new(fd).close if fd
end
end

Expand All @@ -223,7 +223,7 @@ describe Crystal::System::File do
expect_raises(File::AlreadyExistsError, "Error creating temporary file") do
fd, path = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14]))
ensure
File.from_fd(path, fd).close if fd && path
IO::FileDescriptor.new(fd).close if fd
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/file.cr
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ module Crystal::System::File
io << suffix
end

fd, errno = open(path, mode, perm)
fd, errno = open(path, mode, perm, blocking: true)

if errno.none?
return {fd, path}
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/unix/file.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ require "file/error"

# :nodoc:
module Crystal::System::File
def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions)
def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions, blocking _blocking)
perm = ::File::Permissions.new(perm) if perm.is_a? Int32

fd, errno = open(filename, open_flag(mode), perm)
Expand Down
4 changes: 2 additions & 2 deletions src/crystal/system/unix/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ module Crystal::System::FileDescriptor
{r, w}
end

def self.pread(fd, buffer, offset)
bytes_read = LibC.pread(fd, buffer, buffer.size, offset).to_i64
def self.pread(file, buffer, offset)
bytes_read = LibC.pread(file.fd, buffer, buffer.size, offset).to_i64

if bytes_read == -1
raise IO::Error.from_errno "Error reading file"
Expand Down
82 changes: 58 additions & 24 deletions src/crystal/system/win32/file.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require "c/ntifs"
require "c/winioctl"

module Crystal::System::File
def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions) : LibC::Int
def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions, blocking : Bool?) : LibC::Int
perm = ::File::Permissions.new(perm) if perm.is_a? Int32
# Only the owner writable bit is used, since windows only supports
# the read only attribute.
Expand All @@ -19,16 +19,16 @@ module Crystal::System::File
perm = LibC::S_IREAD
end

fd, errno = open(filename, open_flag(mode), ::File::Permissions.new(perm))
fd, errno = open(filename, open_flag(mode), ::File::Permissions.new(perm), blocking != false)
unless errno.none?
raise ::File::Error.from_os_error("Error opening file with mode '#{mode}'", errno, file: filename)
end

fd
end

def self.open(filename : String, flags : Int32, perm : ::File::Permissions) : {LibC::Int, Errno}
access, disposition, attributes = self.posix_to_open_opts flags, perm
def self.open(filename : String, flags : Int32, perm : ::File::Permissions, blocking : Bool) : {LibC::Int, Errno}
access, disposition, attributes = self.posix_to_open_opts flags, perm, blocking

handle = LibC.CreateFileW(
System.to_wstr(filename),
Expand Down Expand Up @@ -56,7 +56,7 @@ module Crystal::System::File
{fd, Errno::NONE}
end

private def self.posix_to_open_opts(flags : Int32, perm : ::File::Permissions)
private def self.posix_to_open_opts(flags : Int32, perm : ::File::Permissions, blocking : Bool)
access = if flags.bits_set? LibC::O_WRONLY
LibC::FILE_GENERIC_WRITE
elsif flags.bits_set? LibC::O_RDWR
Expand Down Expand Up @@ -86,7 +86,7 @@ module Crystal::System::File
disposition = LibC::OPEN_EXISTING
end

attributes = LibC::FILE_ATTRIBUTE_NORMAL
attributes = 0
unless perm.owner_write?
attributes |= LibC::FILE_ATTRIBUTE_READONLY
end
Expand All @@ -106,6 +106,10 @@ module Crystal::System::File
attributes |= LibC::FILE_FLAG_RANDOM_ACCESS
end

unless blocking
attributes |= LibC::FILE_FLAG_OVERLAPPED
end

{access, disposition, attributes}
end

Expand Down Expand Up @@ -478,41 +482,71 @@ module Crystal::System::File
end

private def flock(exclusive, retry)
flags = LibC::LOCKFILE_FAIL_IMMEDIATELY
flags = 0_u32
flags |= LibC::LOCKFILE_FAIL_IMMEDIATELY unless retry && !system_blocking?
flags |= LibC::LOCKFILE_EXCLUSIVE_LOCK if exclusive

handle = windows_handle
if retry
if retry && system_blocking?
until lock_file(handle, flags)
sleep 0.1
end
else
lock_file(handle, flags) || raise IO::Error.from_winerror("Error applying file lock: file is already locked")
lock_file(handle, flags) || raise IO::Error.from_winerror("Error applying file lock: file is already locked", target: self)
end
end

private def lock_file(handle, flags)
# lpOverlapped must be provided despite the synchronous use of this method.
overlapped = LibC::OVERLAPPED.new
# lock the entire file with offset 0 in overlapped and number of bytes set to max value
if 0 != LibC.LockFileEx(handle, flags, 0, 0xFFFF_FFFF, 0xFFFF_FFFF, pointerof(overlapped))
true
else
winerror = WinError.value
if winerror == WinError::ERROR_LOCK_VIOLATION
false
IO::Overlapped::OverlappedOperation.run(handle) do |operation|
overlapped = operation.start
result = LibC.LockFileEx(handle, flags, 0, 0xFFFF_FFFF, 0xFFFF_FFFF, overlapped)

if result == 0
case error = WinError.value
when .error_io_pending?
# the operation is running asynchronously; do nothing
when .error_lock_violation?
# synchronous failure
operation.synchronous = true
return false
else
raise IO::Error.from_os_error("LockFileEx", error, target: self)
end
else
raise IO::Error.from_os_error("LockFileEx", winerror, target: self)
operation.synchronous = true
return true
end

schedule_overlapped(nil)

operation.result(handle) do |error|
raise IO::Error.from_os_error("LockFileEx", error, target: self)
end

true
end
end

private def unlock_file(handle)
# lpOverlapped must be provided despite the synchronous use of this method.
overlapped = LibC::OVERLAPPED.new
# unlock the entire file with offset 0 in overlapped and number of bytes set to max value
if 0 == LibC.UnlockFileEx(handle, 0, 0xFFFF_FFFF, 0xFFFF_FFFF, pointerof(overlapped))
raise IO::Error.from_winerror("UnLockFileEx")
IO::Overlapped::OverlappedOperation.run(handle) do |operation|
overlapped = operation.start
result = LibC.UnlockFileEx(handle, 0, 0xFFFF_FFFF, 0xFFFF_FFFF, overlapped)

if result == 0
error = WinError.value
unless error.error_io_pending?
raise IO::Error.from_os_error("UnlockFileEx", error, target: self)
end
else
operation.synchronous = true
return
end

schedule_overlapped(nil)

operation.result(handle) do |error|
raise IO::Error.from_os_error("UnlockFileEx", error, target: self)
end
end
end

Expand Down
36 changes: 15 additions & 21 deletions src/crystal/system/win32/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ module Crystal::System::FileDescriptor
when .error_access_denied?
raise IO::Error.new "File not open for reading", target: self
when .error_broken_pipe?
return 0_u32
return 0
else
raise IO::Error.from_os_error("Error reading file", error, target: self)
end
end
bytes_read
bytes_read.to_i32
else
overlapped_operation(handle, "ReadFile", read_timeout) do |overlapped|
seekable = LibC.SetFilePointerEx(handle, 0, out offset, IO::Seek::Current) != 0
overlapped_operation(handle, seekable ? offset : nil, "ReadFile", read_timeout, writing: false) do |overlapped|
ret = LibC.ReadFile(handle, slice, slice.size, out byte_count, overlapped)
{ret, byte_count}
end
end.to_i32
end
end

Expand All @@ -43,13 +44,14 @@ module Crystal::System::FileDescriptor
when .error_access_denied?
raise IO::Error.new "File not open for writing", target: self
when .error_broken_pipe?
return 0_u32
return
else
raise IO::Error.from_os_error("Error writing file", error, target: self)
end
end
else
bytes_written = overlapped_operation(handle, "WriteFile", write_timeout, writing: true) do |overlapped|
seekable = LibC.SetFilePointerEx(handle, 0, out offset, IO::Seek::Current) != 0
bytes_written = overlapped_operation(handle, seekable ? offset : nil, "WriteFile", write_timeout, writing: true) do |overlapped|
ret = LibC.WriteFile(handle, slice, slice.size, out byte_count, overlapped)
{ret, byte_count}
end
Expand All @@ -71,6 +73,7 @@ module Crystal::System::FileDescriptor

private def system_blocking_init(value)
@system_blocking = value
Crystal::Scheduler.event_loop.create_completion_port(windows_handle) unless value
end

private def system_close_on_exec?
Expand Down Expand Up @@ -185,13 +188,11 @@ module Crystal::System::FileDescriptor
w_pipe_flags |= LibC::FILE_FLAG_OVERLAPPED unless write_blocking
w_pipe = LibC.CreateNamedPipeA(pipe_name, w_pipe_flags, pipe_mode, 1, PIPE_BUFFER_SIZE, PIPE_BUFFER_SIZE, 0, nil)
raise IO::Error.from_winerror("CreateNamedPipeA") if w_pipe == LibC::INVALID_HANDLE_VALUE
Crystal::Scheduler.event_loop.create_completion_port(w_pipe) unless write_blocking

r_pipe_flags = LibC::FILE_FLAG_NO_BUFFERING
r_pipe_flags |= LibC::FILE_FLAG_OVERLAPPED unless read_blocking
r_pipe = LibC.CreateFileW(System.to_wstr(pipe_name), LibC::GENERIC_READ | LibC::FILE_WRITE_ATTRIBUTES, 0, nil, LibC::OPEN_EXISTING, r_pipe_flags, nil)
raise IO::Error.from_winerror("CreateFileW") if r_pipe == LibC::INVALID_HANDLE_VALUE
Crystal::Scheduler.event_loop.create_completion_port(r_pipe) unless read_blocking

r = IO::FileDescriptor.new(LibC._open_osfhandle(r_pipe, 0), read_blocking)
w = IO::FileDescriptor.new(LibC._open_osfhandle(w_pipe, 0), write_blocking)
Expand All @@ -200,19 +201,12 @@ module Crystal::System::FileDescriptor
{r, w}
end

def self.pread(fd, buffer, offset)
handle = windows_handle!(fd)

overlapped = LibC::OVERLAPPED.new
overlapped.union.offset.offset = LibC::DWORD.new(offset)
overlapped.union.offset.offsetHigh = LibC::DWORD.new(offset >> 32)
if LibC.ReadFile(handle, buffer, buffer.size, out bytes_read, pointerof(overlapped)) == 0
error = WinError.value
return 0_i64 if error == WinError::ERROR_HANDLE_EOF
raise IO::Error.from_os_error "Error reading file", error, target: self
end

bytes_read.to_i64
def self.pread(file, buffer, offset)
Copy link
Contributor

@yxhuvud yxhuvud Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is high time to rename this method as the name refers to a libc call that isn't used on this OS.

read_with_offset perhaps?

handle = windows_handle!(file.fd)
file.overlapped_operation(handle, offset, "ReadFile", file.read_timeout, writing: false) do |overlapped|
ret = LibC.ReadFile(handle, buffer, buffer.size, out byte_count, overlapped)
{ret, byte_count}
end.to_i64
end

def self.from_stdio(fd)
Expand Down
2 changes: 1 addition & 1 deletion src/file.cr
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class File < IO::FileDescriptor
# additional syscall.
def self.new(filename : Path | String, mode = "r", perm = DEFAULT_CREATE_PERMISSIONS, encoding = nil, invalid = nil, blocking = true)
filename = filename.to_s
fd = Crystal::System::File.open(filename, mode, perm: perm)
fd = Crystal::System::File.open(filename, mode, perm: perm, blocking: blocking)
new(filename, fd, blocking: blocking, encoding: encoding, invalid: invalid)
end

Expand Down
2 changes: 1 addition & 1 deletion src/file/preader.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class File::PReader < IO
count = slice.size
count = Math.min(count, @bytesize - @pos)

bytes_read = Crystal::System::FileDescriptor.pread(@file.fd, slice[0, count], @offset + @pos)
bytes_read = Crystal::System::FileDescriptor.pread(@file, slice[0, count], @offset + @pos)

@pos += bytes_read

Expand Down
17 changes: 14 additions & 3 deletions src/io/overlapped.cr
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,14 @@ module IO::Overlapped
Crystal::Scheduler.event_loop.dequeue(timeout_event)
end

def overlapped_operation(handle, method, timeout, *, writing = false, &)
protected def overlapped_operation(handle, offset, method, timeout, *, writing = false, &) : UInt32
OverlappedOperation.run(handle) do |operation|
result, value = yield operation.start
overlapped = operation.start
if offset
overlapped.value.union.offset.offset = LibC::DWORD.new(offset)
overlapped.value.union.offset.offsetHigh = LibC::DWORD.new(offset >> 32)
end
result, value = yield overlapped

if result == 0
case error = WinError.value
Expand All @@ -221,12 +226,13 @@ module IO::Overlapped
end
else
operation.synchronous = true
LibC.SetFilePointerEx(handle, value, nil, IO::Seek::Current) if offset
return value
end

schedule_overlapped(timeout)

operation.result(handle) do |error|
byte_count = operation.result(handle) do |error|
case error
when .error_io_incomplete?
raise IO::TimeoutError.new("#{method} timed out")
Expand All @@ -237,6 +243,11 @@ module IO::Overlapped
return 0_u32
end
end

# the file pointer might have been changed while the operation was in
# progress, so we can't use `IO::Seek::Current` here
LibC.SetFilePointerEx(handle, offset + byte_count, nil, IO::Seek::Set) if offset
byte_count
end
end

Expand Down
Loading