Skip to content

Commit ff6b6b7

Browse files
authored
test(extractor): use all possible libraries to extract a file (#1720)
* test: test to use 7z to extract pkg files * test: using all extraction methods for apk and zip files * test: extract deb without any tool * test: extract cab without any tool * fix: add alternatives fti AyncMock to run on Python 3.7 * fix: fixed tests to run on Python 3.7
1 parent 1cbf89d commit ff6b6b7

File tree

3 files changed

+114
-11
lines changed

3 files changed

+114
-11
lines changed

cve_bin_tool/error_handler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ class ExtractionFailed(ValueError):
104104
"""Extraction fail"""
105105

106106

107+
class ExtractionToolNotFound(Exception):
108+
"""No tool found to extract files"""
109+
110+
107111
class UnknownArchiveType(ValueError):
108112
"""Unknown archive type"""
109113

@@ -192,6 +196,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
192196
AttemptedToWriteOutsideCachedir: 28,
193197
SHAMismatch: 29,
194198
ExtractionFailed: 30,
199+
ExtractionToolNotFound: 30,
195200
UnknownArchiveType: 31,
196201
UnknownConfigType: 32,
197202
CVEDataMissing: 33,

cve_bin_tool/extractor.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (C) 2021 Intel Corporation
1+
# Copyright (C) 2022 Intel Corporation
22
# SPDX-License-Identifier: GPL-3.0-or-later
33

44
"""
@@ -29,7 +29,13 @@
2929
run_coroutine,
3030
)
3131

32-
from .error_handler import ErrorHandler, ErrorMode, ExtractionFailed, UnknownArchiveType
32+
from .error_handler import (
33+
ErrorHandler,
34+
ErrorMode,
35+
ExtractionFailed,
36+
ExtractionToolNotFound,
37+
UnknownArchiveType,
38+
)
3339
from .log import LOGGER
3440

3541
# Run rpmfile in a thread
@@ -101,7 +107,7 @@ async def extract_file_rpm(self, filename, extraction_path):
101107
else:
102108
if not await aio_inpath("7z"):
103109
with ErrorHandler(mode=self.error_mode, logger=self.logger):
104-
raise Exception("7z is required to extract rpm files")
110+
raise ExtractionToolNotFound("7z is required to extract rpm files")
105111
else:
106112
stdout, stderr, _ = await aio_run_command(["7z", "x", filename])
107113
if stderr or not stdout:
@@ -179,7 +185,7 @@ async def extract_file_deb(self, filename, extraction_path):
179185
"""Extract debian packages"""
180186
if not await aio_inpath("ar"):
181187
with ErrorHandler(mode=self.error_mode, logger=self.logger):
182-
raise Exception("'ar' is required to extract deb files")
188+
raise ExtractionToolNotFound("'ar' is required to extract deb files")
183189
else:
184190
stdout, stderr, _ = await aio_run_command(["ar", "x", filename])
185191
if stderr:
@@ -226,12 +232,14 @@ async def extract_file_apk(self, filename, extraction_path):
226232
else:
227233
return await self.extract_file_zip(filename, extraction_path)
228234

229-
@staticmethod
230-
async def extract_file_cab(filename, extraction_path):
235+
async def extract_file_cab(self, filename, extraction_path):
231236
"""Extract cab files"""
232237
if sys.platform.startswith("linux"):
233238
if not await aio_inpath("cabextract"):
234-
raise Exception("'cabextract' is required to extract cab files")
239+
with ErrorHandler(mode=self.error_mode, logger=self.logger):
240+
raise ExtractionToolNotFound(
241+
"'cabextract' is required to extract cab files"
242+
)
235243
else:
236244
stdout, stderr, _ = await aio_run_command(
237245
["cabextract", "-d", extraction_path, filename]

test/test_extractor.py

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1-
# Copyright (C) 2021 Intel Corporation
1+
# Copyright (C) 2022 Intel Corporation
22
# SPDX-License-Identifier: GPL-3.0-or-later
33

44
""" CVE Binary Tool tests for the extractor function """
55
import shutil
6+
import sys
67
import tarfile
78
import tempfile
89
import unittest
910
import unittest.mock
11+
from asyncio import coroutine
1012
from io import BytesIO
1113
from pathlib import Path
1214
from test.utils import (
@@ -16,11 +18,13 @@
1618
TMUX_DEB,
1719
download_file,
1820
)
19-
from typing import List
21+
from typing import Dict, List
2022
from zipfile import ZipFile, ZipInfo
2123

2224
import pytest
25+
from pytest_mock import MockerFixture
2326

27+
from cve_bin_tool.error_handler import ERROR_CODES, ExtractionToolNotFound
2428
from cve_bin_tool.extractor import Extractor
2529
from cve_bin_tool.util import inpath
2630

@@ -179,9 +183,31 @@ def setup_method(self):
179183
def extension_list(self) -> List[str]:
180184
return self.extractor.file_extractors[self.extractor.extract_file_pkg]
181185

186+
@pytest.mark.parametrize(
187+
"inpath_return_values",
188+
(
189+
{"tar": True, "7z": False}, # use `tar` to extract
190+
{"tar": False, "7z": True}, # use `7z` to extract
191+
),
192+
)
182193
@pytest.mark.asyncio
183-
async def test_extract_file_pkg(self, extension_list: List[str]):
194+
async def test_extract_file_pkg(
195+
self,
196+
extension_list: List[str],
197+
inpath_return_values: Dict[str, bool],
198+
mocker: MockerFixture,
199+
):
184200
"""Test the pkg file extraction"""
201+
202+
if sys.version_info >= (3, 8):
203+
mock_func = mocker.AsyncMock(side_effect=inpath_return_values.get)
204+
else:
205+
mock_func = coroutine(
206+
mocker.MagicMock(side_effect=inpath_return_values.get)
207+
)
208+
209+
mocker.patch("cve_bin_tool.extractor.aio_inpath", mock_func)
210+
185211
async for extracted_path in self.extract_files(
186212
[f"test{extension}" for extension in extension_list]
187213
):
@@ -229,6 +255,26 @@ async def test_extract_file_deb(self, extension_list: List[str]):
229255
):
230256
assert (Path(extracted_path) / "usr" / "bin" / "tmux").is_file()
231257

258+
@pytest.mark.asyncio
259+
async def test_extract_file_deb_no_tool(
260+
self, extension_list: List[str], mocker: MockerFixture
261+
):
262+
"""Test the deb file extraction with no extraction tool"""
263+
264+
if sys.version_info >= (3, 8):
265+
mocker.patch("cve_bin_tool.extractor.aio_inpath", return_value=False)
266+
else:
267+
mocker.patch(
268+
"cve_bin_tool.extractor.aio_inpath",
269+
coroutine(mocker.Mock(return_value=False)),
270+
)
271+
272+
with pytest.raises(SystemExit) as e:
273+
async for _ in self.extract_files(
274+
[f"test{extension}" for extension in extension_list]
275+
):
276+
assert e.value.args[0] == ERROR_CODES[ExtractionToolNotFound]
277+
232278

233279
class TestExtractFileCab(TestExtractorBase):
234280
"""Tests for the cab file extractor"""
@@ -248,6 +294,26 @@ async def test_extract_file_cab(self, extension_list: List[str]):
248294
):
249295
assert (Path(extracted_path) / "usr" / "bin" / "python3.8").is_file()
250296

297+
@pytest.mark.asyncio
298+
async def test_extract_file_cab_no_cabextract(
299+
self, extension_list: List[str], mocker: MockerFixture
300+
):
301+
"""Test the cab file extraction with no extraction tool"""
302+
303+
if sys.version_info >= (3, 8):
304+
mocker.patch("cve_bin_tool.extractor.aio_inpath", return_value=False)
305+
else:
306+
mocker.patch(
307+
"cve_bin_tool.extractor.aio_inpath",
308+
coroutine(mocker.Mock(return_value=False)),
309+
)
310+
311+
with pytest.raises(SystemExit) as e:
312+
async for _ in self.extract_files(
313+
[f"test{extension}" for extension in extension_list]
314+
):
315+
assert e.value.args[0] == ERROR_CODES[ExtractionToolNotFound]
316+
251317

252318
class TestExtractFileZip(TestExtractorBase):
253319
"""Tests for the zip file extractor
@@ -268,9 +334,33 @@ def setup_method(self, extension_list: List[str]):
268334
with ZipFile(zippath, "w") as zipfile:
269335
zipfile.writestr(ZipInfo("test.txt"), "feedface")
270336

337+
@pytest.mark.parametrize(
338+
"inpath_return_values",
339+
(
340+
{"unzip": True, "7z": False, "zipinfo": False, "file": False},
341+
{"unzip": False, "7z": True, "zipinfo": False, "file": False},
342+
{"unzip": False, "7z": False, "zipinfo": True, "file": False},
343+
{"unzip": False, "7z": False, "zipinfo": False, "file": True},
344+
),
345+
)
271346
@pytest.mark.asyncio
272-
async def test_extract_file_zip(self, extension_list: List[str]):
347+
async def test_extract_file_zip(
348+
self,
349+
extension_list: List[str],
350+
inpath_return_values: Dict[str, bool],
351+
mocker: MockerFixture,
352+
):
273353
"""Test the zip file extraction"""
354+
355+
if sys.version_info >= (3, 8):
356+
mock_func = mocker.AsyncMock(side_effect=inpath_return_values.get)
357+
else:
358+
mock_func = coroutine(
359+
mocker.MagicMock(side_effect=inpath_return_values.get)
360+
)
361+
362+
mocker.patch("cve_bin_tool.extractor.aio_inpath", mock_func)
363+
274364
async for dir_path in self.extract_files(
275365
[f"test{extension}" for extension in extension_list]
276366
):

0 commit comments

Comments
 (0)