Skip to content

feat: auto detect for vex and added linkage check #4415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions cve_bin_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,7 @@ def main(argv=None):
total_files: int = 0
parsed_data: dict[ProductInfo, TriageData] = {}
vex_product_info: dict[str, str] = {}
sbom_serial_number = ""
# Package List parsing
if args["package_list"]:
sbom_root = args["package_list"]
Expand Down Expand Up @@ -1095,6 +1096,7 @@ def main(argv=None):
validate=not args["disable_validation_check"],
)
parsed_data = sbom_list.parse_sbom()
sbom_serial_number = sbom_list.serialNumber
LOGGER.info(
f"The number of products to process from SBOM - {len(parsed_data)}"
)
Expand All @@ -1103,10 +1105,10 @@ def main(argv=None):
cve_scanner.get_cves(product_info, triage_data)

if args["vex_file"]:
# for now use cyclonedx as auto detection is not implemented in latest pypi package of lib4vex
# use auto so that lib4vex can auto-detect the vex type.
vexdata = VEXParse(
filename=args["vex_file"],
vextype="cyclonedx",
vextype="auto",
logger=LOGGER,
)
parsed_vex_data = vexdata.parse_vex()
Expand All @@ -1122,9 +1124,14 @@ def main(argv=None):
LOGGER.info(
f"VEX file {args['vex_file']} is not a standalone file and will be used as a triage file"
)
# need to do validation on the sbom part
# need to implement is_linked() function which will check the linkage.
if args["sbom_file"]:
# check weather vex is linked with given sbom or not.
# only check cyclonedx since it have serialNumber.
if (
args["sbom_file"]
and args["sbom"] == "cyclonedx"
and vexdata.vextype == "cyclonedx"
and sbom_serial_number not in vexdata.serialNumbers
):
LOGGER.warning(
f"SBOM file: {args['sbom_file']} is not linked to VEX file: {args['vex_file']}."
)
Expand Down Expand Up @@ -1162,6 +1169,7 @@ def main(argv=None):
"release": args["release"],
"vendor": args["vendor"],
"revision_reason": args["revision_reason"],
"sbom_serial_number": sbom_serial_number,
}
elif args["vex_file"]:
vex_product_info["revision_reason"] = args["revision_reason"]
Expand Down
1 change: 1 addition & 0 deletions cve_bin_tool/output_engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ def output_cves(self, outfile, output_type="console"):
self.vex_type,
self.all_cve_data,
self.vex_product_info["revision_reason"],
self.vex_product_info["sbom_serial_number"],
logger=self.logger,
)
vexgen.generate_vex()
Expand Down
21 changes: 21 additions & 0 deletions cve_bin_tool/sbom_manager/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
decode_cpe23,
find_product_location,
validate_location,
validate_serialNumber,
)
from cve_bin_tool.validator import validate_cyclonedx, validate_spdx, validate_swid

Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(
self.type = sbom_type
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
self.validate = validate
self.serialNumber = ""

# Connect to the database
self.cvedb = CVEDB(version_check=False)
Expand Down Expand Up @@ -253,6 +255,25 @@ def parse_cyclonedx_spdx(self) -> [(str, str, str)]:
sbom_parser = SBOMParser(sbom_type=self.type)
# Load SBOM
sbom_parser.parse_file(self.filename)
doc = sbom_parser.get_document()
uuid = doc.get("uuid", "")
if self.type == "cyclonedx":
parts = uuid.split(":")
if len(parts) == 3 and parts[0] == "urn" and parts[1] == "uuid":
serialNumber = parts[2]
if validate_serialNumber(serialNumber):
self.serialNumber = serialNumber
else:
LOGGER.error(
f"The SBOM file '{self.filename}' has an invalid serial number."
)
return []
else:
LOGGER.error(
f"The SBOM file '{self.filename}' has an invalid serial number."
)
return []

modules = []
if self.validate and self.filename.endswith(".xml"):
# Only for XML files
Expand Down
32 changes: 29 additions & 3 deletions cve_bin_tool/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def decode_purl(purl: str) -> ProductInfo | None:
return None


def decode_bom_ref(ref: str) -> ProductInfo | None:
def decode_bom_ref(ref: str):
"""
Decodes the BOM reference for each component.

Expand All @@ -418,11 +418,29 @@ def decode_bom_ref(ref: str) -> ProductInfo | None:
urn_cdx = re.compile(
r"urn:cdx:(?P<bomSerialNumber>.*?)\/(?P<bom_version>.*?)#(?P<bom_ref>.*)"
)
urn_cdx_with_purl = re.compile(
r"urn:cdx:(?P<bomSerialNumber>[^/]+)\/(?P<bom_version>[^#]+)#(?P<purl>pkg:[^\s]+)"
)
location = "location/to/product"
match = urn_cbt_ext_ref.match(ref) or urn_cbt_ref.match(ref) or urn_cdx.match(ref)
match = (
urn_cdx_with_purl.match(ref)
or urn_cbt_ext_ref.match(ref)
or urn_cbt_ref.match(ref)
or urn_cdx.match(ref)
)
if match:
urn_dict = match.groupdict()
if "bom_ref" in urn_dict: # For urn_cdx match
if "purl" in urn_dict: # For urn_cdx_with_purl match
serialNumber = urn_dict["bomSerialNumber"]
product_info = decode_purl(urn_dict["purl"])
if not validate_serialNumber(serialNumber):
LOGGER.error(
f"The BOM link contains an invalid serial number: '{serialNumber}'"
)
return product_info
else:
return product_info, serialNumber
elif "bom_ref" in urn_dict: # For urn_cdx match
cdx_bom_ref = urn_dict["bom_ref"]
try:
product, version = cdx_bom_ref.rsplit("-", 1)
Expand Down Expand Up @@ -466,6 +484,14 @@ def validate_version(version: str) -> bool:
return re.search(cpe_regex, version) is not None


def validate_serialNumber(serialNumber: str) -> bool:
"""
Validates the serial number present in sbom
"""
pattern = r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
return re.match(pattern, serialNumber) is not None


class DirWalk:
"""
for filename in DirWalk('*.c').walk(roots):
Expand Down
9 changes: 7 additions & 2 deletions cve_bin_tool/vex_manager/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
vextype: str,
all_cve_data: Dict[ProductInfo, CVEData],
revision_reason: str = "",
sbom_serial_number: str = "",
sbom: Optional[str] = None,
logger: Optional[Logger] = None,
validate: bool = True,
Expand All @@ -62,6 +63,7 @@ def __init__(
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
self.validate = validate
self.all_cve_data = all_cve_data
self.sbom_serial_number = sbom_serial_number

def generate_vex(self) -> None:
"""
Expand Down Expand Up @@ -155,10 +157,13 @@ def __get_vulnerabilities(self) -> List[Vulnerability]:
else cve.remarks.name
)
# more details will be added using set_value()
bom_version = 1
ref = f"urn:cbt:{bom_version}/{vendor}#{product}:{version}"
if purl is None:
purl = f"pkg:generic/{vendor}/{product}@{version}"
bom_version = 1
if self.sbom_serial_number != "":
ref = f"urn:cdx:{self.sbom_serial_number}/{bom_version}#{purl}"
else:
ref = f"urn:cbt:{bom_version}/{vendor}#{product}:{version}"

vulnerability.set_value("purl", str(purl))
vulnerability.set_value("bom_link", ref)
Expand Down
19 changes: 16 additions & 3 deletions cve_bin_tool/vex_manager/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class VEXParse:
- vextype (str): The type of VEX file.
- logger: The logger object for logging messages.
- parsed_data: A dictionary to store the parsed data.
- serialNumbers: serialNumbers from the bom_link used to check linkage with sbom.

Methods:
- __init__(self, filename: str, vextype: str, logger=None): Initializes the VEXParse object.
Expand Down Expand Up @@ -60,11 +61,16 @@ def __init__(self, filename: str, vextype: str, logger=None):
self.vextype = vextype
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
self.parsed_data = {}
self.serialNumbers = set()

def parse_vex(self) -> DefaultDict[ProductInfo, TriageData]:
"""Parses the VEX file and extracts the necessary fields from the vulnerabilities."""
vexparse = VEXParser(vex_type=self.vextype)
vexparse.parse(self.filename)
if self.vextype == "auto":
self.vextype = vexparse.get_type()

self.logger.info(f"Parsed Vex File: {self.filename} of type: {self.vextype}")
self.logger.debug(f"VEX Vulnerabilities: {vexparse.get_vulnerabilities()}")
self.__process_vulnerabilities(vexparse.get_vulnerabilities())
self.__process_metadata(vexparse.get_metadata())
Expand Down Expand Up @@ -101,7 +107,6 @@ def __process_product(self, product) -> None:

def __process_vulnerabilities(self, vulnerabilities) -> None:
""" "processes the vulnerabilities and extracts the necessary fields from the vulnerability."""
# for now cyclonedx is supported with minor tweaks other will be supported later
for vuln in vulnerabilities:
# Extract necessary fields from the vulnerability
cve_id = vuln.get("id")
Expand All @@ -110,10 +115,18 @@ def __process_vulnerabilities(self, vulnerabilities) -> None:
response = vuln.get("remediation")
comments = vuln.get("comments")
severity = vuln.get("severity") # Severity is not available in Lib4VEX
# Decode the bom reference for cyclonedx something similar would be done for other formats
# Decode the bom reference for cyclonedx and purl for csaf and openvex
product_info = None
serialNumber = ""
if self.vextype == "cyclonedx":
product_info = decode_bom_ref(vuln.get("bom_link"))
decoded_ref = decode_bom_ref(vuln.get("bom_link"))
if isinstance(decoded_ref, tuple) and not isinstance(
decoded_ref, ProductInfo
):
product_info, serialNumber = decoded_ref
self.serialNumbers.add(serialNumber)
else:
product_info = decoded_ref
elif self.vextype in ["openvex", "csaf"]:
product_info = decode_purl(vuln.get("purl"))
if product_info:
Expand Down
Loading