diff --git a/Lib/pathlib/_abc.py b/Lib/pathlib/_abc.py index 501f852059010f..504f9bb0c76f9a 100644 --- a/Lib/pathlib/_abc.py +++ b/Lib/pathlib/_abc.py @@ -14,7 +14,7 @@ from abc import ABC, abstractmethod from glob import _PathGlobber, _no_recurse_symlinks from pathlib import PurePath, Path -from pathlib._os import magic_open, CopyReader, CopyWriter +from pathlib._os import magic_open, CopyWriter def _explode_path(path): @@ -353,8 +353,6 @@ def readlink(self): """ raise NotImplementedError - _copy_reader = property(CopyReader) - def copy(self, target, follow_symlinks=True, dirs_exist_ok=False, preserve_metadata=False): """ diff --git a/Lib/pathlib/_local.py b/Lib/pathlib/_local.py index 2f0c87fd27b624..bd1e871ebcfad7 100644 --- a/Lib/pathlib/_local.py +++ b/Lib/pathlib/_local.py @@ -19,7 +19,7 @@ except ImportError: grp = None -from pathlib._os import LocalCopyReader, LocalCopyWriter, PathInfo, DirEntryInfo +from pathlib._os import LocalCopyWriter, PathInfo, DirEntryInfo, ensure_different_files __all__ = [ @@ -1079,7 +1079,6 @@ def replace(self, target): os.replace(self, target) return self.with_segments(target) - _copy_reader = property(LocalCopyReader) _copy_writer = property(LocalCopyWriter) def copy(self, target, follow_symlinks=True, dirs_exist_ok=False, @@ -1125,7 +1124,7 @@ def move(self, target): else: if not hasattr(target, '_copy_writer'): target = self.with_segments(target_str) - target._copy_writer._ensure_different_file(self) + ensure_different_files(self, target) try: os.replace(self, target_str) return target diff --git a/Lib/pathlib/_os.py b/Lib/pathlib/_os.py index c0c81ada858cdf..e5b894b524ca7e 100644 --- a/Lib/pathlib/_os.py +++ b/Lib/pathlib/_os.py @@ -200,26 +200,6 @@ def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None, raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}") -class CopyReader: - """ - Class that implements the "read" part of copying between path objects. - An instance of this class is available from the ReadablePath._copy_reader - property. - """ - __slots__ = ('_path',) - - def __init__(self, path): - self._path = path - - _readable_metakeys = frozenset() - - def _read_metadata(self, metakeys, *, follow_symlinks=True): - """ - Returns path metadata as a dict with string keys. - """ - raise NotImplementedError - - class CopyWriter: """ Class that implements the "write" part of copying between path objects. An @@ -231,48 +211,39 @@ class CopyWriter: def __init__(self, path): self._path = path - _writable_metakeys = frozenset() - - def _write_metadata(self, metadata, *, follow_symlinks=True): - """ - Sets path metadata from the given dict with string keys. - """ - raise NotImplementedError + def _copy_metadata(self, source, follow_symlinks=True): + """Copy metadata from the given path to our path.""" + pass def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata): - self._ensure_distinct_path(source) - if preserve_metadata: - metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys - else: - metakeys = None + ensure_distinct_paths(source, self._path) if not follow_symlinks and source.is_symlink(): - self._create_symlink(source, metakeys) + self._create_symlink(source, preserve_metadata) elif source.is_dir(): - self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok) + self._create_dir(source, follow_symlinks, dirs_exist_ok, preserve_metadata) else: - self._create_file(source, metakeys) + self._create_file(source, preserve_metadata) return self._path - def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok): + def _create_dir(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata): """Copy the given directory to our path.""" children = list(source.iterdir()) self._path.mkdir(exist_ok=dirs_exist_ok) for src in children: dst = self._path.joinpath(src.name) if not follow_symlinks and src.is_symlink(): - dst._copy_writer._create_symlink(src, metakeys) + dst._copy_writer._create_symlink(src, preserve_metadata) elif src.is_dir(): - dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok) + dst._copy_writer._create_dir(src, follow_symlinks, dirs_exist_ok, preserve_metadata) else: - dst._copy_writer._create_file(src, metakeys) - if metakeys: - metadata = source._copy_reader._read_metadata(metakeys) - if metadata: - self._write_metadata(metadata) + dst._copy_writer._create_file(src, preserve_metadata) - def _create_file(self, source, metakeys): + if preserve_metadata: + self._copy_metadata(source) + + def _create_file(self, source, preserve_metadata): """Copy the given file to our path.""" - self._ensure_different_file(source) + ensure_different_files(source, self._path) with magic_open(source, 'rb') as source_f: try: with magic_open(self._path, 'wb') as target_f: @@ -283,77 +254,34 @@ def _create_file(self, source, metakeys): raise FileNotFoundError( f'Directory does not exist: {self._path}') from e raise - if metakeys: - metadata = source._copy_reader._read_metadata(metakeys) - if metadata: - self._write_metadata(metadata) + if preserve_metadata: + self._copy_metadata(source) - def _create_symlink(self, source, metakeys): + def _create_symlink(self, source, preserve_metadata): """Copy the given symbolic link to our path.""" self._path.symlink_to(source.readlink()) - if metakeys: - metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False) - if metadata: - self._write_metadata(metadata, follow_symlinks=False) - - def _ensure_different_file(self, source): - """ - Raise OSError(EINVAL) if both paths refer to the same file. - """ - pass - - def _ensure_distinct_path(self, source): - """ - Raise OSError(EINVAL) if the other path is within this path. - """ - # Note: there is no straightforward, foolproof algorithm to determine - # if one directory is within another (a particularly perverse example - # would be a single network share mounted in one location via NFS, and - # in another location via CIFS), so we simply checks whether the - # other path is lexically equal to, or within, this path. - if source == self._path: - err = OSError(EINVAL, "Source and target are the same path") - elif source in self._path.parents: - err = OSError(EINVAL, "Source path is a parent of target path") - else: - return - err.filename = str(source) - err.filename2 = str(self._path) - raise err + if preserve_metadata: + self._copy_metadata(source, follow_symlinks=False) -class LocalCopyReader(CopyReader): - """This object implements the "read" part of copying local paths. Don't - try to construct it yourself. +def ensure_distinct_paths(source, target): """ - __slots__ = () - - _readable_metakeys = {'mode', 'times_ns'} - if hasattr(os.stat_result, 'st_flags'): - _readable_metakeys.add('flags') - if hasattr(os, 'listxattr'): - _readable_metakeys.add('xattrs') - _readable_metakeys = frozenset(_readable_metakeys) - - def _read_metadata(self, metakeys, *, follow_symlinks=True): - metadata = {} - if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys: - st = self._path.stat(follow_symlinks=follow_symlinks) - if 'mode' in metakeys: - metadata['mode'] = S_IMODE(st.st_mode) - if 'times_ns' in metakeys: - metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns - if 'flags' in metakeys: - metadata['flags'] = st.st_flags - if 'xattrs' in metakeys: - try: - metadata['xattrs'] = [ - (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks)) - for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)] - except OSError as err: - if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): - raise - return metadata + Raise OSError(EINVAL) if the other path is within this path. + """ + # Note: there is no straightforward, foolproof algorithm to determine + # if one directory is within another (a particularly perverse example + # would be a single network share mounted in one location via NFS, and + # in another location via CIFS), so we simply checks whether the + # other path is lexically equal to, or within, this path. + if source == target: + err = OSError(EINVAL, "Source and target are the same path") + elif source in target.parents: + err = OSError(EINVAL, "Source path is a parent of target path") + else: + return + err.filename = str(source) + err.filename2 = str(target) + raise err class LocalCopyWriter(CopyWriter): @@ -362,42 +290,42 @@ class LocalCopyWriter(CopyWriter): """ __slots__ = () - _writable_metakeys = LocalCopyReader._readable_metakeys + def _copy_metadata(self, source, follow_symlinks=True): + """Copy metadata from the given path to our path.""" + target = self._path + info = source.info - def _write_metadata(self, metadata, *, follow_symlinks=True): - def _nop(*args, ns=None, follow_symlinks=None): - pass + copy_times_ns = ( + hasattr(info, '_access_time_ns') and + hasattr(info, '_mod_time_ns') and + (follow_symlinks or os.utime in os.supports_follow_symlinks)) + if copy_times_ns: + t0 = info._access_time_ns(follow_symlinks=follow_symlinks) + t1 = info._mod_time_ns(follow_symlinks=follow_symlinks) + os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks) - if follow_symlinks: - # use the real function if it exists - def lookup(name): - return getattr(os, name, _nop) - else: - # use the real function only if it exists - # *and* it supports follow_symlinks - def lookup(name): - fn = getattr(os, name, _nop) - if fn in os.supports_follow_symlinks: - return fn - return _nop - - times_ns = metadata.get('times_ns') - if times_ns is not None: - lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks) # We must copy extended attributes before the file is (potentially) # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. - xattrs = metadata.get('xattrs') - if xattrs is not None: + copy_xattrs = ( + hasattr(info, '_xattrs') and + hasattr(os, 'setxattr') and + (follow_symlinks or os.setxattr in os.supports_follow_symlinks)) + if copy_xattrs: + xattrs = info._xattrs(follow_symlinks=follow_symlinks) for attr, value in xattrs: try: - os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks) + os.setxattr(target, attr, value, follow_symlinks=follow_symlinks) except OSError as e: if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): raise - mode = metadata.get('mode') - if mode is not None: + + copy_posix_permissions = ( + hasattr(info, '_posix_permissions') and + (follow_symlinks or os.chmod in os.supports_follow_symlinks)) + if copy_posix_permissions: + posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks) try: - lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks) + os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks) except NotImplementedError: # if we got a NotImplementedError, it's because # * follow_symlinks=False, @@ -410,66 +338,146 @@ def lookup(name): # symlink. give up, suppress the error. # (which is what shutil always did in this circumstance.) pass - flags = metadata.get('flags') - if flags is not None: + + copy_bsd_flags = ( + hasattr(info, '_bsd_flags') and + hasattr(os, 'chflags') and + (follow_symlinks or os.chflags in os.supports_follow_symlinks)) + if copy_bsd_flags: + bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks) try: - lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks) + os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks) except OSError as why: if why.errno not in (EOPNOTSUPP, ENOTSUP): raise if copyfile: # Use fast OS routine for local file copying where available. - def _create_file(self, source, metakeys): + def _create_file(self, source, preserve_metadata): """Copy the given file to the given target.""" try: source = os.fspath(source) except TypeError: - super()._create_file(source, metakeys) + super()._create_file(source, preserve_metadata) else: copyfile(source, os.fspath(self._path)) if os.name == 'nt': # Windows: symlink target might not exist yet if we're copying several # files, so ensure we pass is_dir to os.symlink(). - def _create_symlink(self, source, metakeys): + def _create_symlink(self, source, preserve_metadata): """Copy the given symlink to the given target.""" self._path.symlink_to(source.readlink(), source.is_dir()) - if metakeys: - metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False) - if metadata: - self._write_metadata(metadata, follow_symlinks=False) + if preserve_metadata: + self._copy_metadata(source, follow_symlinks=False) - def _ensure_different_file(self, source): - """ - Raise OSError(EINVAL) if both paths refer to the same file. - """ + +def ensure_different_files(source, target): + """ + Raise OSError(EINVAL) if both paths refer to the same file. + """ + try: + source_file_id = source.info._file_id + target_file_id = target.info._file_id + except AttributeError: + if source != target: + return + else: try: - if not self._path.samefile(source): + if source_file_id() != target_file_id(): return except (OSError, ValueError): return - err = OSError(EINVAL, "Source and target are the same file") - err.filename = str(source) - err.filename2 = str(self._path) - raise err + err = OSError(EINVAL, "Source and target are the same file") + err.filename = str(source) + err.filename2 = str(target) + raise err class _PathInfoBase: - __slots__ = () + __slots__ = ('_path', '_stat_result', '_lstat_result') + + def __init__(self, path): + self._path = str(path) def __repr__(self): path_type = "WindowsPath" if os.name == "nt" else "PosixPath" return f"<{path_type}.info>" + def _stat(self, *, follow_symlinks=True, ignore_errors=False): + """Return the status as an os.stat_result, or None if stat() fails and + ignore_errors is true.""" + if follow_symlinks: + try: + result = self._stat_result + except AttributeError: + pass + else: + if ignore_errors or result is not None: + return result + try: + self._stat_result = os.stat(self._path) + except (OSError, ValueError): + self._stat_result = None + if not ignore_errors: + raise + return self._stat_result + else: + try: + result = self._lstat_result + except AttributeError: + pass + else: + if ignore_errors or result is not None: + return result + try: + self._lstat_result = os.lstat(self._path) + except (OSError, ValueError): + self._lstat_result = None + if not ignore_errors: + raise + return self._lstat_result + + def _posix_permissions(self, *, follow_symlinks=True): + """Return the POSIX file permissions.""" + return S_IMODE(self._stat(follow_symlinks=follow_symlinks).st_mode) + + def _file_id(self, *, follow_symlinks=True): + """Returns the identifier of the file.""" + st = self._stat(follow_symlinks=follow_symlinks) + return st.st_dev, st.st_ino + + def _access_time_ns(self, *, follow_symlinks=True): + """Return the access time in nanoseconds.""" + return self._stat(follow_symlinks=follow_symlinks).st_atime_ns + + def _mod_time_ns(self, *, follow_symlinks=True): + """Return the modify time in nanoseconds.""" + return self._stat(follow_symlinks=follow_symlinks).st_mtime_ns + + if hasattr(os.stat_result, 'st_flags'): + def _bsd_flags(self, *, follow_symlinks=True): + """Return the flags.""" + return self._stat(follow_symlinks=follow_symlinks).st_flags + + if hasattr(os, 'listxattr'): + def _xattrs(self, *, follow_symlinks=True): + """Return the xattrs as a list of (attr, value) pairs, or an empty + list if extended attributes aren't supported.""" + try: + return [ + (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks)) + for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)] + except OSError as err: + if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): + raise + return [] + class _WindowsPathInfo(_PathInfoBase): """Implementation of pathlib.types.PathInfo that provides status information for Windows paths. Don't try to construct it yourself.""" - __slots__ = ('_path', '_exists', '_is_dir', '_is_file', '_is_symlink') - - def __init__(self, path): - self._path = str(path) + __slots__ = ('_exists', '_is_dir', '_is_file', '_is_symlink') def exists(self, *, follow_symlinks=True): """Whether this path exists.""" @@ -525,44 +533,35 @@ def is_symlink(self): class _PosixPathInfo(_PathInfoBase): """Implementation of pathlib.types.PathInfo that provides status information for POSIX paths. Don't try to construct it yourself.""" - __slots__ = ('_path', '_mode') - - def __init__(self, path): - self._path = str(path) - self._mode = [None, None] - - def _get_mode(self, *, follow_symlinks=True): - idx = bool(follow_symlinks) - mode = self._mode[idx] - if mode is None: - try: - st = os.stat(self._path, follow_symlinks=follow_symlinks) - except (OSError, ValueError): - mode = 0 - else: - mode = st.st_mode - if follow_symlinks or S_ISLNK(mode): - self._mode[idx] = mode - else: - # Not a symlink, so stat() will give the same result - self._mode = [mode, mode] - return mode + __slots__ = () def exists(self, *, follow_symlinks=True): """Whether this path exists.""" - return self._get_mode(follow_symlinks=follow_symlinks) > 0 + st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True) + if st is None: + return False + return True def is_dir(self, *, follow_symlinks=True): """Whether this path is a directory.""" - return S_ISDIR(self._get_mode(follow_symlinks=follow_symlinks)) + st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True) + if st is None: + return False + return S_ISDIR(st.st_mode) def is_file(self, *, follow_symlinks=True): """Whether this path is a regular file.""" - return S_ISREG(self._get_mode(follow_symlinks=follow_symlinks)) + st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True) + if st is None: + return False + return S_ISREG(st.st_mode) def is_symlink(self): """Whether this path is a symbolic link.""" - return S_ISLNK(self._get_mode(follow_symlinks=False)) + st = self._stat(follow_symlinks=False, ignore_errors=True) + if st is None: + return False + return S_ISLNK(st.st_mode) PathInfo = _WindowsPathInfo if os.name == 'nt' else _PosixPathInfo @@ -572,25 +571,25 @@ class DirEntryInfo(_PathInfoBase): """Implementation of pathlib.types.PathInfo that provides status information by querying a wrapped os.DirEntry object. Don't try to construct it yourself.""" - __slots__ = ('_entry', '_exists') + __slots__ = ('_entry',) def __init__(self, entry): + super().__init__(entry.path) self._entry = entry + def _stat(self, *, follow_symlinks=True, ignore_errors=False): + try: + return self._entry.stat(follow_symlinks=follow_symlinks) + except OSError: + if not ignore_errors: + raise + return None + def exists(self, *, follow_symlinks=True): """Whether this path exists.""" if not follow_symlinks: return True - try: - return self._exists - except AttributeError: - try: - self._entry.stat() - except OSError: - self._exists = False - else: - self._exists = True - return self._exists + return self._stat(ignore_errors=True) is not None def is_dir(self, *, follow_symlinks=True): """Whether this path is a directory.""" diff --git a/Lib/test/test_pathlib/test_pathlib.py b/Lib/test/test_pathlib/test_pathlib.py index 7f61f3d6223198..8900b292ad0e46 100644 --- a/Lib/test/test_pathlib/test_pathlib.py +++ b/Lib/test/test_pathlib/test_pathlib.py @@ -1440,11 +1440,13 @@ def test_copy_dir_preserve_metadata(self): if hasattr(os, 'chflags') and hasattr(stat, 'UF_NODUMP'): os.chflags(source / 'fileC', stat.UF_NODUMP) target = base / 'copyA' + + subpaths = ['.', 'fileC', 'dirD', 'dirD/fileD'] + source_sts = [source.joinpath(subpath).stat() for subpath in subpaths] source.copy(target, preserve_metadata=True) + target_sts = [target.joinpath(subpath).stat() for subpath in subpaths] - for subpath in ['.', 'fileC', 'dirD', 'dirD/fileD']: - source_st = source.joinpath(subpath).stat() - target_st = target.joinpath(subpath).stat() + for source_st, target_st in zip(source_sts, target_sts): self.assertLessEqual(source_st.st_atime, target_st.st_atime) self.assertLessEqual(source_st.st_mtime, target_st.st_mtime) self.assertEqual(source_st.st_mode, target_st.st_mode)