|
8 | 8 | import re
|
9 | 9 | import shutil
|
10 | 10 | import sys
|
| 11 | +import tarfile |
11 | 12 | import tempfile
|
12 | 13 | from pathlib import Path
|
13 | 14 |
|
@@ -73,11 +74,42 @@ def can_extract(self, filename):
|
73 | 74 | return True
|
74 | 75 | return False
|
75 | 76 |
|
76 |
| - @staticmethod |
77 |
| - async def extract_file_tar(filename, extraction_path): |
| 77 | + def tar_member_filter(self, members, extraction_path): |
| 78 | + """Generator function to serve as a backported filter for tarfile extraction |
| 79 | + based on https://docs.python.org/3/library/tarfile.html#examples |
| 80 | + """ |
| 81 | + for tarmember in members: |
| 82 | + if tarmember.isfile() and str( |
| 83 | + Path(extraction_path, tarmember.name).resolve() |
| 84 | + ).startsWith(extraction_path): |
| 85 | + yield tarmember |
| 86 | + |
| 87 | + async def extract_file_tar(self, filename, extraction_path): |
78 | 88 | """Extract tar files"""
|
| 89 | + |
| 90 | + # make sure we have full path for later checks |
| 91 | + extraction_path = str(Path(extraction_path).resolve()) |
79 | 92 | with ErrorHandler(mode=ErrorMode.Ignore) as e:
|
80 |
| - await aio_unpack_archive(filename, extraction_path) |
| 93 | + tar = tarfile.open(filename) |
| 94 | + # Python 3.12 has a data filter we can use in extract |
| 95 | + # tarfile has this available in older versions as well |
| 96 | + if hasattr(tarfile, "data_filter"): |
| 97 | + tar.extractall(path=extraction_path, filter="data") # nosec |
| 98 | + # nosec line because bandit doesn't understand filters yet |
| 99 | + |
| 100 | + # FIXME: the backported fix is not working on windows. |
| 101 | + # this leaves the current (unsafe) behaviour so we can fix at least one OS for now |
| 102 | + elif sys.platform == "win32": |
| 103 | + tar.extractall(path=extraction_path) # nosec |
| 104 | + |
| 105 | + # Some versions may need us to implement a filter to avoid unsafe behaviour |
| 106 | + # we could consider logging a warning here |
| 107 | + else: |
| 108 | + tar.extractall( |
| 109 | + path=extraction_path, |
| 110 | + members=self.tar_member_filter(tar, extraction_path), |
| 111 | + ) # nosec |
| 112 | + tar.close() |
81 | 113 | return e.exit_code
|
82 | 114 |
|
83 | 115 | async def extract_file_rpm(self, filename, extraction_path):
|
|
0 commit comments