diff --git a/CHANGES.md b/CHANGES.md index 3ee6596a..3aeb5695 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -27,6 +27,7 @@ The released versions correspond to PyPI releases. (see [#1121](../../issues/1121)) * fixed workaround for recursion with pytest under Windows to ignore capitalization of pytest executable (see [#1096](../../issues/1096)) +* added missing `mode` property to fake file wrapper (see [#1162](../../issues/1096)) ### Infrastructure * adapt test for increased default buffer size in Python 3.14a6 diff --git a/pyfakefs/fake_file.py b/pyfakefs/fake_file.py index 6da47911..2b2fa8a4 100644 --- a/pyfakefs/fake_file.py +++ b/pyfakefs/fake_file.py @@ -751,9 +751,8 @@ def __init__( self, file_object: FakeFile, file_path: AnyStr, - update: bool, - read: bool, - append: bool, + open_modes: _OpenModes, + allow_update: bool, delete_on_close: bool, filesystem: "FakeFilesystem", newline: Optional[str], @@ -768,9 +767,8 @@ def __init__( ): self.file_object = file_object self.file_path = file_path # type: ignore[var-annotated] - self._append = append - self._read = read - self.allow_update = update + self.open_modes = open_modes + self.allow_update = allow_update self._closefd = closefd self._file_epoch = file_object.epoch self.raw_io = raw_io @@ -801,8 +799,8 @@ def __init__( self._flush_pos = 0 if contents: self._flush_pos = len(contents) - if update: - if not append: + if self.allow_update: + if not self.open_modes.append: self._io.seek(0) else: self._io.seek(self._flush_pos) @@ -890,6 +888,21 @@ def closed(self) -> bool: """Simulate the `closed` attribute on file.""" return not self._is_open() + @property + def mode(self) -> str: + if self.open_modes.append: + m = "ab" if self._binary else "a" + return m + "+" if self.open_modes.can_read else m + if self.open_modes.truncate: + if self._binary: + return "rb+" if self.open_modes.can_read else "wb" + return "w+" if self.open_modes.can_read else "w" + if self.open_modes.must_not_exist: + m = "xb" if self._binary else "x" + return m + "+" if self.open_modes.can_read else m + m = "rb" if self._binary else "r" + return m + "+" if self.open_modes.can_write else m + def _try_flush(self, old_pos: int) -> None: """Try to flush and reset the position if it fails.""" flush_pos = self._flush_pos @@ -910,7 +923,7 @@ def flush(self) -> None: self._check_open_file() if self.allow_update: - if self._append: + if self.open_modes.append: contents = self._io.getvalue() self._sync_io() old_contents = self.file_object.byte_contents @@ -951,14 +964,14 @@ def _flush_related_files(self) -> None: open_file is not self and isinstance(open_file, FakeFileWrapper) and self.file_object == open_file.file_object - and not open_file._append + and not open_file.open_modes.append ): open_file._sync_io() def seek(self, offset: int, whence: int = 0) -> None: """Move read/write pointer in 'file'.""" self._check_open_file() - if not self._append: + if not self.open_modes.append: self._io.seek(offset, whence) else: self._read_seek = offset @@ -976,7 +989,7 @@ def tell(self) -> int: if not self.is_stream: self.flush() - if not self._append: + if not self.open_modes.append: return self._io.tell() if self._read_whence: write_seek = self._io.tell() @@ -1001,7 +1014,7 @@ def _set_stream_contents(self, contents: bytes) -> None: self._io.seek(0) self._io.truncate() self._io.putvalue(contents) - if not self._append: + if not self.open_modes.append: self._io.seek(whence) def _read_wrappers(self, name: str) -> Callable: @@ -1117,7 +1130,7 @@ def write_wrapper(*args, **kwargs): self._try_flush(old_pos) if not flush_all: ret_value = io_attr(*args, **kwargs) - if self._append: + if self.open_modes.append: self._read_seek = self._io.tell() self._read_whence = 0 return ret_value @@ -1132,7 +1145,7 @@ def _adapt_size_for_related_files(self, size: int) -> None: open_file is not self and isinstance(open_file, FakeFileWrapper) and self.file_object == open_file.file_object - and cast(FakeFileWrapper, open_file)._append + and cast(FakeFileWrapper, open_file).open_modes.append ): open_file._read_seek += size @@ -1146,7 +1159,7 @@ def _truncate_wrapper(self) -> Callable: def truncate_wrapper(*args, **kwargs): """Wrap truncate call to call flush after truncate.""" - if self._append: + if self.open_modes.append: self._io.seek(self._read_seek, self._read_whence) size = io_attr(*args, **kwargs) self.flush() @@ -1179,7 +1192,7 @@ def __getattr__(self, name: str) -> Any: if reading or writing: self._check_open_file() - if not self._read and reading: + if not self.open_modes.can_read and reading: return self._read_error() if not self.opened_as_fd and not self.allow_update and writing: return self._write_error() @@ -1192,7 +1205,7 @@ def __getattr__(self, name: str) -> Any: self.file_object.st_atime = helpers.now() if truncate: return self._truncate_wrapper() - if self._append: + if self.open_modes.append: if reading: return self._read_wrappers(name) elif not writing: @@ -1234,12 +1247,12 @@ def _check_open_file(self) -> None: raise ValueError("I/O operation on closed file") def __iter__(self) -> Union[Iterator[str], Iterator[bytes]]: - if not self._read: + if not self.open_modes.can_read: self._raise("File is not open for reading") return self._io.__iter__() def __next__(self): - if not self._read: + if not self.open_modes.can_read: self._raise("File is not open for reading") return next(self._io) diff --git a/pyfakefs/fake_open.py b/pyfakefs/fake_open.py index d3dcd4e8..0bd90a8f 100644 --- a/pyfakefs/fake_open.py +++ b/pyfakefs/fake_open.py @@ -160,7 +160,7 @@ def call( flushed if buffer size is exceeded. The default (-1) uses a system specific default buffer size. Text line mode (e.g. buffering=1 in text mode) is not supported. - encoding: The encoding used to encode unicode strings / decode + encoding: The encoding used to encode Unicode strings / decode bytes. errors: (str) Defines how encoding errors are handled. newline: Controls universal newlines, passed to stream object. @@ -270,9 +270,8 @@ def call( fakefile = FakeFileWrapper( file_object, file_path, - update=open_modes.can_write and can_write, - read=open_modes.can_read, - append=open_modes.append, + open_modes=open_modes, + allow_update=open_modes.can_write and can_write, delete_on_close=self._delete_on_close, filesystem=self.filesystem, newline=newline, diff --git a/pyfakefs/tests/fake_open_test.py b/pyfakefs/tests/fake_open_test.py index fb495058..3f9bb4db 100644 --- a/pyfakefs/tests/fake_open_test.py +++ b/pyfakefs/tests/fake_open_test.py @@ -263,8 +263,10 @@ def test_exclusive_create_binary_file(self): self.os.mkdir(file_dir) contents = b"Binary contents" with self.open(file_path, "xb") as fake_file: + self.assertEqual("xb", fake_file.mode) fake_file.write(contents) with self.open(file_path, "rb") as fake_file: + self.assertEqual("rb", fake_file.mode) self.assertEqual(contents, fake_file.read()) def test_overwrite_existing_file(self): @@ -302,11 +304,27 @@ def test_open_with_wplus(self): self.assertTrue(self.os.path.exists(file_path)) with self.open(file_path, "r", encoding="utf8") as fake_file: self.assertEqual("old contents", fake_file.read()) + self.assertEqual("r", fake_file.mode) # actual tests with self.open(file_path, "w+", encoding="utf8") as fake_file: fake_file.write("new contents") fake_file.seek(0) self.assertTrue("new contents", fake_file.read()) + self.assertEqual("w+", fake_file.mode) + + def test_open_with_wplus_binary(self): + file_path = self.make_path("wplus_file_b") + self.create_file(file_path, contents=b"old contents") + self.assertTrue(self.os.path.exists(file_path)) + with self.open(file_path, "rb") as fake_file: + self.assertEqual(b"old contents", fake_file.read()) + self.assertEqual("rb", fake_file.mode) + # actual tests + with self.open(file_path, "wb+") as fake_file: + fake_file.write(b"new contents") + fake_file.seek(0) + self.assertTrue(b"new contents", fake_file.read()) + self.assertEqual("rb+", fake_file.mode) def test_open_with_wplus_truncation(self): # set up @@ -319,6 +337,7 @@ def test_open_with_wplus_truncation(self): with self.open(file_path, "w+", encoding="utf8") as fake_file: fake_file.seek(0) self.assertEqual("", fake_file.read()) + self.assertEqual("w+", fake_file.mode) def test_open_with_append_flag(self): contents = [ @@ -335,6 +354,7 @@ def test_open_with_append_flag(self): fake_file.read(0) with self.assertRaises(io.UnsupportedOperation): fake_file.readline() + self.assertEqual("a", fake_file.mode) expected_len = len("".join(contents)) expected_len += len(contents) * (len(self.os.linesep) - 1) self.assertEqual(expected_len, fake_file.tell()) @@ -344,6 +364,22 @@ def test_open_with_append_flag(self): with self.open(file_path, encoding="utf8") as fake_file: self.assertEqual(contents + additional_contents, fake_file.readlines()) + def test_open_with_append_flag_binary(self): + contents = b"Just some boring stuff... " + additional_contents = b"some excitement added" + file_path = self.make_path("append-binary") + self.create_file(file_path, contents=contents) + with self.open(file_path, "ab") as fake_file: + with self.assertRaises(io.UnsupportedOperation): + fake_file.read(0) + self.assertEqual("ab", fake_file.mode) + self.assertEqual(len(contents), fake_file.tell()) + fake_file.seek(0) + self.assertEqual(0, fake_file.tell()) + fake_file.write(additional_contents) + with self.open(file_path, "rb") as fake_file: + self.assertEqual(contents + additional_contents, fake_file.read()) + def check_append_with_aplus(self): file_path = self.make_path("aplus_file") self.create_file(file_path, contents="old contents") @@ -357,6 +393,7 @@ def check_append_with_aplus(self): self.filesystem, delete_on_close=True ) with self.open(file_path, "a+", encoding="utf8") as fake_file: + self.assertEqual("a+", fake_file.mode) self.assertEqual(12, fake_file.tell()) fake_file.write("new contents") self.assertEqual(24, fake_file.tell()) @@ -391,6 +428,12 @@ def test_read_empty_file_with_aplus(self): with self.open(file_path, "a+", encoding="utf8") as fake_file: self.assertEqual("", fake_file.read()) + def test_read_empty_file_with_aplus_binary(self): + file_path = self.make_path("aplus_file") + with self.open(file_path, "ab+") as fake_file: + self.assertEqual(b"", fake_file.read()) + self.assertEqual("ab+", fake_file.mode) + def test_read_with_rplus(self): # set up file_path = self.make_path("rplus_file") @@ -401,11 +444,27 @@ def test_read_with_rplus(self): # actual tests with self.open(file_path, "r+", encoding="utf8") as fake_file: self.assertEqual("old contents here", fake_file.read()) + self.assertEqual("r+", fake_file.mode) fake_file.seek(0) fake_file.write("new contents") fake_file.seek(0) self.assertEqual("new contents here", fake_file.read()) + def test_read_with_rplus_binary(self): + file_path = self.make_path("rplus_binary") + self.create_file(file_path, contents=b"old contents here") + self.assertTrue(self.os.path.exists(file_path)) + with self.open(file_path, "rb") as fake_file: + self.assertEqual(b"old contents here", fake_file.read()) + + with self.open(file_path, "rb+") as fake_file: + self.assertEqual(b"old contents here", fake_file.read()) + self.assertEqual("rb+", fake_file.mode) + fake_file.seek(0) + fake_file.write(b"new contents") + fake_file.seek(0) + self.assertEqual(b"new contents here", fake_file.read()) + def create_with_permission(self, file_path, perm_bits): self.create_file(file_path) self.os.chmod(file_path, perm_bits) @@ -900,6 +959,7 @@ def test_closing_file_with_different_close_mode(self): file_obj = self.filesystem.get_object(filename) with self.open(fd, "wb", closefd=False) as fp: fp.write(b"test") + self.assertEqual("wb", fp.mode) self.assertTrue(self.filesystem.has_open_file(file_obj)) self.os.close(fd) self.assertFalse(self.filesystem.has_open_file(file_obj)) @@ -1028,6 +1088,7 @@ def test_use_opener_with_exclusive_write(self): file_path = self.make_path("bar") with self.open(file_path, "x", encoding="utf8", opener=self.opener) as f: + self.assertEqual("x", f.mode) assert f.write("bar") == 3 with self.assertRaises(OSError): f.read() @@ -1042,6 +1103,7 @@ def test_use_opener_with_exclusive_plus(self): file_path = self.make_path("bar") with self.open(file_path, "x+", encoding="utf8", opener=self.opener) as f: + self.assertEqual("x+", f.mode) assert f.write("bar") == 3 assert f.read() == "" with self.open(file_path, encoding="utf8") as f: