Skip to content

fix: rpm extractor for windows #1696

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 15, 2022
28 changes: 22 additions & 6 deletions cve_bin_tool/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,16 @@ async def extract_file_rpm(self, filename, extraction_path):
if stderr or not stdout:
return 1
filenames = await aio_glob(os.path.join(extraction_path, "*.cpio"))
if not filenames:
filenames = await aio_glob(
os.path.join(extraction_path, "*.cpio.zstd")
)
filename = filenames[0]
exit_code = await self.extract_file_zst(filename, extraction_path)
if exit_code:
return 1
filenames = await aio_glob(os.path.join(extraction_path, "*.cpio"))
filename = filenames[0]

stdout, stderr, _ = await aio_run_command(["7z", "x", filename])
if stderr or not stdout:
return 1
Expand All @@ -118,11 +126,19 @@ async def extract_file_zst(self, filename: str, extraction_path: str) -> int:

dctx = zstandard.ZstdDecompressor()
with ErrorHandler(mode=ErrorMode.Ignore) as e:
with open(filename, "rb") as f:
with dctx.stream_reader(f) as reader:
with tarfile.open(mode="r|", fileobj=reader) as tar:
tar.extractall(extraction_path)
tar.close()
if filename.endswith(".cpio.zstd"):
with open(filename, "rb") as compressed:
base = os.path.basename(filename)
file = os.path.splitext(base)[0]
output_path = os.path.join(extraction_path, file)
with open(output_path, "wb") as destination:
dctx.copy_stream(compressed, destination)
else:
with open(filename, "rb") as f:
with dctx.stream_reader(f) as reader:
with tarfile.open(mode="r|", fileobj=reader) as tar:
tar.extractall(extraction_path)
tar.close()
return e.exit_code

async def extract_file_pkg(self, filename: str, extraction_path: str) -> int:
Expand Down
26 changes: 26 additions & 0 deletions test/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
# import logging
# logging.basicConfig(level=logging.DEBUG)

DOVECOT_FILE_PATH = path.join(
path.abspath(path.dirname(__file__)),
"condensed-downloads",
"dovecot-2.3.14-1.fc34.i686.rpm",
)
CAB_TEST_FILE_PATH = path.join(
path.abspath(path.dirname(__file__)), "assets", "cab-test-python3.8.cab"
)
Expand Down Expand Up @@ -140,6 +145,27 @@ async def test_extract_file_rpm_no_rpm2cipo(self, extension_list: List[str]):
mock_aio_run_command.assert_not_called()


class TestExtractFileRpmWithZstd(TestExtractorBase):
"""Tests for the rpm file extractor (zstd/windows)"""

@classmethod
def setup_class(cls):
super().setup_class()
shutil.copyfile(DOVECOT_FILE_PATH, path.join(cls.tempdir, "test.rpm"))

@pytest.fixture
def extension_list(self) -> List[str]:
return self.extractor.file_extractors[self.extractor.extract_file_rpm]

@pytest.mark.asyncio
async def test_extract_file_rpm(self, extension_list: List[str]):
"""Test the rpm file extraction in windows with zstd"""
async for extracted_path in self.extract_files(
[f"test{extension}" for extension in extension_list]
):
assert path.isfile(path.join(extracted_path, "usr", "sbin", "dovecot"))


class TestExtractFileDeb(TestExtractorBase):
"""Tests for deb file extractor"""

Expand Down