Skip to content

Add missing mode property to fake file wrapper #1163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The released versions correspond to PyPI releases.
(see [#1121](../../issues/1121))
* fixed workaround for recursion with pytest under Windows to ignore capitalization
of pytest executable (see [#1096](../../issues/1096))
* added missing `mode` property to fake file wrapper (see [#1162](../../issues/1096))

### Infrastructure
* adapt test for increased default buffer size in Python 3.14a6
Expand Down
53 changes: 33 additions & 20 deletions pyfakefs/fake_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,9 +751,8 @@ def __init__(
self,
file_object: FakeFile,
file_path: AnyStr,
update: bool,
read: bool,
append: bool,
open_modes: _OpenModes,
allow_update: bool,
delete_on_close: bool,
filesystem: "FakeFilesystem",
newline: Optional[str],
Expand All @@ -768,9 +767,8 @@ def __init__(
):
self.file_object = file_object
self.file_path = file_path # type: ignore[var-annotated]
self._append = append
self._read = read
self.allow_update = update
self.open_modes = open_modes
self.allow_update = allow_update
self._closefd = closefd
self._file_epoch = file_object.epoch
self.raw_io = raw_io
Expand Down Expand Up @@ -801,8 +799,8 @@ def __init__(
self._flush_pos = 0
if contents:
self._flush_pos = len(contents)
if update:
if not append:
if self.allow_update:
if not self.open_modes.append:
self._io.seek(0)
else:
self._io.seek(self._flush_pos)
Expand Down Expand Up @@ -890,6 +888,21 @@ def closed(self) -> bool:
"""Simulate the `closed` attribute on file."""
return not self._is_open()

@property
def mode(self) -> str:
if self.open_modes.append:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to clarify how I discovered this bug - it might help you better handle different values for the mode parameter:I came across this code in the imp module (imp.load_source) (Python 3.9):

class _HackedGetData:
    """Compatibility support for 'file' arguments of various load_*()
    functions."""

    def __init__(self, fullname, path, file=None):
        super().__init__(fullname, path)
        self.file = file

    def get_data(self, path):
        """Gross hack to contort loader to deal w/ load_*()'s bad API."""
        if self.file and path == self.path:
            # The contract of get_data() requires us to return bytes. Reopen the
            # file in binary mode if needed.
            if not self.file.closed:
                file = self.file
                if 'b' not in file.mode:
                    file.close()
            if self.file.closed:
                self.file = file = open(self.path, 'rb')

            with file:
                return file.read()
        else:
            return super().get_data(path)

This code relies on checking for "b" (binary) mode.
For completeness, it might make sense to add these values too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good point - I forgot that!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, added the check for binary mode, please check!

Copy link

@evgenii-moriakhin evgenii-moriakhin Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from doc https://docs.python.org/3/library/functions.html#open

Modes 'r+' and 'r+b' open the file with no truncation.

in the current code here

       if self.open_modes.truncate:
            if self._binary:
                return "rb+" if self.open_modes.can_read else "wb"

this is probably a typo

although I see a separate test on r+b below, and maybe I haven't studied how the truncate flag works in the fake file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these tests are run both in the real and in the fake fs, so no, this is not a typo - I just reproduced the behavior in the real file system, though that was unexpected to me, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tested this locally, though I haven't found an explanation for this behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'm merging this then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I've been thinking about the r+/w+ behavior, and I think that this is because while there is a difference on opening with r+ and w+ (w+ will truncate the file, r+ will not), after the fie has been opened, the behavior is exactly the same for both modes (e.g. both allow reading and writing). Returning the mode in both cases as "r+" may be a design decision.

m = "ab" if self._binary else "a"
return m + "+" if self.open_modes.can_read else m
if self.open_modes.truncate:
if self._binary:
return "rb+" if self.open_modes.can_read else "wb"
return "w+" if self.open_modes.can_read else "w"
if self.open_modes.must_not_exist:
m = "xb" if self._binary else "x"
return m + "+" if self.open_modes.can_read else m
m = "rb" if self._binary else "r"
return m + "+" if self.open_modes.can_write else m

def _try_flush(self, old_pos: int) -> None:
"""Try to flush and reset the position if it fails."""
flush_pos = self._flush_pos
Expand All @@ -910,7 +923,7 @@ def flush(self) -> None:
self._check_open_file()

if self.allow_update:
if self._append:
if self.open_modes.append:
contents = self._io.getvalue()
self._sync_io()
old_contents = self.file_object.byte_contents
Expand Down Expand Up @@ -951,14 +964,14 @@ def _flush_related_files(self) -> None:
open_file is not self
and isinstance(open_file, FakeFileWrapper)
and self.file_object == open_file.file_object
and not open_file._append
and not open_file.open_modes.append
):
open_file._sync_io()

def seek(self, offset: int, whence: int = 0) -> None:
"""Move read/write pointer in 'file'."""
self._check_open_file()
if not self._append:
if not self.open_modes.append:
self._io.seek(offset, whence)
else:
self._read_seek = offset
Expand All @@ -976,7 +989,7 @@ def tell(self) -> int:
if not self.is_stream:
self.flush()

if not self._append:
if not self.open_modes.append:
return self._io.tell()
if self._read_whence:
write_seek = self._io.tell()
Expand All @@ -1001,7 +1014,7 @@ def _set_stream_contents(self, contents: bytes) -> None:
self._io.seek(0)
self._io.truncate()
self._io.putvalue(contents)
if not self._append:
if not self.open_modes.append:
self._io.seek(whence)

def _read_wrappers(self, name: str) -> Callable:
Expand Down Expand Up @@ -1117,7 +1130,7 @@ def write_wrapper(*args, **kwargs):
self._try_flush(old_pos)
if not flush_all:
ret_value = io_attr(*args, **kwargs)
if self._append:
if self.open_modes.append:
self._read_seek = self._io.tell()
self._read_whence = 0
return ret_value
Expand All @@ -1132,7 +1145,7 @@ def _adapt_size_for_related_files(self, size: int) -> None:
open_file is not self
and isinstance(open_file, FakeFileWrapper)
and self.file_object == open_file.file_object
and cast(FakeFileWrapper, open_file)._append
and cast(FakeFileWrapper, open_file).open_modes.append
):
open_file._read_seek += size

Expand All @@ -1146,7 +1159,7 @@ def _truncate_wrapper(self) -> Callable:

def truncate_wrapper(*args, **kwargs):
"""Wrap truncate call to call flush after truncate."""
if self._append:
if self.open_modes.append:
self._io.seek(self._read_seek, self._read_whence)
size = io_attr(*args, **kwargs)
self.flush()
Expand Down Expand Up @@ -1179,7 +1192,7 @@ def __getattr__(self, name: str) -> Any:

if reading or writing:
self._check_open_file()
if not self._read and reading:
if not self.open_modes.can_read and reading:
return self._read_error()
if not self.opened_as_fd and not self.allow_update and writing:
return self._write_error()
Expand All @@ -1192,7 +1205,7 @@ def __getattr__(self, name: str) -> Any:
self.file_object.st_atime = helpers.now()
if truncate:
return self._truncate_wrapper()
if self._append:
if self.open_modes.append:
if reading:
return self._read_wrappers(name)
elif not writing:
Expand Down Expand Up @@ -1234,12 +1247,12 @@ def _check_open_file(self) -> None:
raise ValueError("I/O operation on closed file")

def __iter__(self) -> Union[Iterator[str], Iterator[bytes]]:
if not self._read:
if not self.open_modes.can_read:
self._raise("File is not open for reading")
return self._io.__iter__()

def __next__(self):
if not self._read:
if not self.open_modes.can_read:
self._raise("File is not open for reading")
return next(self._io)

Expand Down
7 changes: 3 additions & 4 deletions pyfakefs/fake_open.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def call(
flushed if buffer size is exceeded. The default (-1) uses a
system specific default buffer size. Text line mode (e.g.
buffering=1 in text mode) is not supported.
encoding: The encoding used to encode unicode strings / decode
encoding: The encoding used to encode Unicode strings / decode
bytes.
errors: (str) Defines how encoding errors are handled.
newline: Controls universal newlines, passed to stream object.
Expand Down Expand Up @@ -270,9 +270,8 @@ def call(
fakefile = FakeFileWrapper(
file_object,
file_path,
update=open_modes.can_write and can_write,
read=open_modes.can_read,
append=open_modes.append,
open_modes=open_modes,
allow_update=open_modes.can_write and can_write,
delete_on_close=self._delete_on_close,
filesystem=self.filesystem,
newline=newline,
Expand Down
62 changes: 62 additions & 0 deletions pyfakefs/tests/fake_open_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,10 @@ def test_exclusive_create_binary_file(self):
self.os.mkdir(file_dir)
contents = b"Binary contents"
with self.open(file_path, "xb") as fake_file:
self.assertEqual("xb", fake_file.mode)
fake_file.write(contents)
with self.open(file_path, "rb") as fake_file:
self.assertEqual("rb", fake_file.mode)
self.assertEqual(contents, fake_file.read())

def test_overwrite_existing_file(self):
Expand Down Expand Up @@ -302,11 +304,27 @@ def test_open_with_wplus(self):
self.assertTrue(self.os.path.exists(file_path))
with self.open(file_path, "r", encoding="utf8") as fake_file:
self.assertEqual("old contents", fake_file.read())
self.assertEqual("r", fake_file.mode)
# actual tests
with self.open(file_path, "w+", encoding="utf8") as fake_file:
fake_file.write("new contents")
fake_file.seek(0)
self.assertTrue("new contents", fake_file.read())
self.assertEqual("w+", fake_file.mode)

def test_open_with_wplus_binary(self):
file_path = self.make_path("wplus_file_b")
self.create_file(file_path, contents=b"old contents")
self.assertTrue(self.os.path.exists(file_path))
with self.open(file_path, "rb") as fake_file:
self.assertEqual(b"old contents", fake_file.read())
self.assertEqual("rb", fake_file.mode)
# actual tests
with self.open(file_path, "wb+") as fake_file:
fake_file.write(b"new contents")
fake_file.seek(0)
self.assertTrue(b"new contents", fake_file.read())
self.assertEqual("rb+", fake_file.mode)

def test_open_with_wplus_truncation(self):
# set up
Expand All @@ -319,6 +337,7 @@ def test_open_with_wplus_truncation(self):
with self.open(file_path, "w+", encoding="utf8") as fake_file:
fake_file.seek(0)
self.assertEqual("", fake_file.read())
self.assertEqual("w+", fake_file.mode)

def test_open_with_append_flag(self):
contents = [
Expand All @@ -335,6 +354,7 @@ def test_open_with_append_flag(self):
fake_file.read(0)
with self.assertRaises(io.UnsupportedOperation):
fake_file.readline()
self.assertEqual("a", fake_file.mode)
expected_len = len("".join(contents))
expected_len += len(contents) * (len(self.os.linesep) - 1)
self.assertEqual(expected_len, fake_file.tell())
Expand All @@ -344,6 +364,22 @@ def test_open_with_append_flag(self):
with self.open(file_path, encoding="utf8") as fake_file:
self.assertEqual(contents + additional_contents, fake_file.readlines())

def test_open_with_append_flag_binary(self):
contents = b"Just some boring stuff... "
additional_contents = b"some excitement added"
file_path = self.make_path("append-binary")
self.create_file(file_path, contents=contents)
with self.open(file_path, "ab") as fake_file:
with self.assertRaises(io.UnsupportedOperation):
fake_file.read(0)
self.assertEqual("ab", fake_file.mode)
self.assertEqual(len(contents), fake_file.tell())
fake_file.seek(0)
self.assertEqual(0, fake_file.tell())
fake_file.write(additional_contents)
with self.open(file_path, "rb") as fake_file:
self.assertEqual(contents + additional_contents, fake_file.read())

def check_append_with_aplus(self):
file_path = self.make_path("aplus_file")
self.create_file(file_path, contents="old contents")
Expand All @@ -357,6 +393,7 @@ def check_append_with_aplus(self):
self.filesystem, delete_on_close=True
)
with self.open(file_path, "a+", encoding="utf8") as fake_file:
self.assertEqual("a+", fake_file.mode)
self.assertEqual(12, fake_file.tell())
fake_file.write("new contents")
self.assertEqual(24, fake_file.tell())
Expand Down Expand Up @@ -391,6 +428,12 @@ def test_read_empty_file_with_aplus(self):
with self.open(file_path, "a+", encoding="utf8") as fake_file:
self.assertEqual("", fake_file.read())

def test_read_empty_file_with_aplus_binary(self):
file_path = self.make_path("aplus_file")
with self.open(file_path, "ab+") as fake_file:
self.assertEqual(b"", fake_file.read())
self.assertEqual("ab+", fake_file.mode)

def test_read_with_rplus(self):
# set up
file_path = self.make_path("rplus_file")
Expand All @@ -401,11 +444,27 @@ def test_read_with_rplus(self):
# actual tests
with self.open(file_path, "r+", encoding="utf8") as fake_file:
self.assertEqual("old contents here", fake_file.read())
self.assertEqual("r+", fake_file.mode)
fake_file.seek(0)
fake_file.write("new contents")
fake_file.seek(0)
self.assertEqual("new contents here", fake_file.read())

def test_read_with_rplus_binary(self):
file_path = self.make_path("rplus_binary")
self.create_file(file_path, contents=b"old contents here")
self.assertTrue(self.os.path.exists(file_path))
with self.open(file_path, "rb") as fake_file:
self.assertEqual(b"old contents here", fake_file.read())

with self.open(file_path, "rb+") as fake_file:
self.assertEqual(b"old contents here", fake_file.read())
self.assertEqual("rb+", fake_file.mode)
fake_file.seek(0)
fake_file.write(b"new contents")
fake_file.seek(0)
self.assertEqual(b"new contents here", fake_file.read())

def create_with_permission(self, file_path, perm_bits):
self.create_file(file_path)
self.os.chmod(file_path, perm_bits)
Expand Down Expand Up @@ -900,6 +959,7 @@ def test_closing_file_with_different_close_mode(self):
file_obj = self.filesystem.get_object(filename)
with self.open(fd, "wb", closefd=False) as fp:
fp.write(b"test")
self.assertEqual("wb", fp.mode)
self.assertTrue(self.filesystem.has_open_file(file_obj))
self.os.close(fd)
self.assertFalse(self.filesystem.has_open_file(file_obj))
Expand Down Expand Up @@ -1028,6 +1088,7 @@ def test_use_opener_with_exclusive_write(self):

file_path = self.make_path("bar")
with self.open(file_path, "x", encoding="utf8", opener=self.opener) as f:
self.assertEqual("x", f.mode)
assert f.write("bar") == 3
with self.assertRaises(OSError):
f.read()
Expand All @@ -1042,6 +1103,7 @@ def test_use_opener_with_exclusive_plus(self):

file_path = self.make_path("bar")
with self.open(file_path, "x+", encoding="utf8", opener=self.opener) as f:
self.assertEqual("x+", f.mode)
assert f.write("bar") == 3
assert f.read() == ""
with self.open(file_path, encoding="utf8") as f:
Expand Down
Loading