diff --git a/CHANGELOG.md b/CHANGELOG.md index a30bdf9437..c170af1745 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## master (unreleased) +- Added configurable string length (`min_str_len`) across backends (#1303) + ### New Features ### Breaking Changes diff --git a/capa/features/extractors/common.py b/capa/features/extractors/common.py index f8918b8d8e..2bc71ebb12 100644 --- a/capa/features/extractors/common.py +++ b/capa/features/extractors/common.py @@ -42,6 +42,7 @@ ) from capa.features.freeze import is_freeze from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH logger = logging.getLogger(__name__) @@ -52,14 +53,16 @@ MATCH_JSON_OBJECT = b'{"' -def extract_file_strings(buf: bytes, **kwargs) -> Iterator[tuple[String, Address]]: +def extract_file_strings( + buf: bytes, min_str_len: int = DEFAULT_STRING_LENGTH, **kwargs +) -> Iterator[tuple[String, Address]]: """ extract ASCII and UTF-16 LE strings from file """ - for s in capa.features.extractors.strings.extract_ascii_strings(buf): + for s in capa.features.extractors.strings.extract_ascii_strings(buf, min_str_len=min_str_len): yield String(s.s), FileOffsetAddress(s.offset) - for s in capa.features.extractors.strings.extract_unicode_strings(buf): + for s in capa.features.extractors.strings.extract_unicode_strings(buf, min_str_len=min_str_len): yield String(s.s), FileOffsetAddress(s.offset) diff --git a/capa/features/extractors/dnfile/extractor.py b/capa/features/extractors/dnfile/extractor.py index 4b6694f57d..ce43c01c20 100644 --- a/capa/features/extractors/dnfile/extractor.py +++ b/capa/features/extractors/dnfile/extractor.py @@ -28,6 +28,7 @@ import capa.features.extractors.dnfile.function from capa.features.common import Feature from capa.features.address import NO_ADDRESS, Address, DNTokenAddress, DNTokenOffsetAddress +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod from capa.features.extractors.base_extractor import ( BBHandle, @@ -82,8 +83,9 @@ def get_type(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]: class DnfileFeatureExtractor(StaticFeatureExtractor): - def __init__(self, path: Path): + def __init__(self, path: Path, min_str_len: int = DEFAULT_STRING_LENGTH): self.pe: dnfile.dnPE = dnfile.dnPE(str(path)) + self.min_str_len = min_str_len super().__init__(hashes=SampleHashes.from_bytes(path.read_bytes())) # pre-compute .NET token lookup tables; each .NET method has access to this cache for feature extraction @@ -92,9 +94,9 @@ def __init__(self, path: Path): # pre-compute these because we'll yield them at *every* scope. self.global_features: list[tuple[Feature, Address]] = [] - self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_format()) - self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_os(pe=self.pe)) - self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_arch(pe=self.pe)) + self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_format(self.pe)) + self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_os(self.pe)) + self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_arch(self.pe)) def get_base_address(self): return NO_ADDRESS @@ -103,7 +105,9 @@ def extract_global_features(self): yield from self.global_features def extract_file_features(self): - yield from capa.features.extractors.dnfile.file.extract_features(self.pe) + yield from capa.features.extractors.dnfile.file.extract_features( + ctx={"pe": self.pe, "min_str_len": self.min_str_len} + ) def get_functions(self) -> Iterator[FunctionHandle]: # create a method lookup table @@ -112,7 +116,13 @@ def get_functions(self) -> Iterator[FunctionHandle]: fh: FunctionHandle = FunctionHandle( address=DNTokenAddress(token), inner=method, - ctx={"pe": self.pe, "calls_from": set(), "calls_to": set(), "cache": self.token_cache}, + ctx={ + "pe": self.pe, + "calls_from": set(), + "calls_to": set(), + "cache": self.token_cache, + "min_str_len": self.min_str_len, + }, ) # method tokens should be unique diff --git a/capa/features/extractors/dnfile/file.py b/capa/features/extractors/dnfile/file.py index 7a723e8dca..d361d0ad27 100644 --- a/capa/features/extractors/dnfile/file.py +++ b/capa/features/extractors/dnfile/file.py @@ -25,37 +25,43 @@ from capa.features.address import Address -def extract_file_import_names(pe: dnfile.dnPE) -> Iterator[tuple[Import, Address]]: - yield from capa.features.extractors.dotnetfile.extract_file_import_names(pe=pe) +def extract_file_import_names(ctx) -> Iterator[tuple[Import, Address]]: + pe = ctx["pe"] if isinstance(ctx, dict) else ctx + yield from capa.features.extractors.dotnetfile.extract_file_import_names(pe) -def extract_file_format(pe: dnfile.dnPE) -> Iterator[tuple[Format, Address]]: - yield from capa.features.extractors.dotnetfile.extract_file_format(pe=pe) +def extract_file_format(ctx) -> Iterator[tuple[Format, Address]]: + pe = ctx["pe"] if isinstance(ctx, dict) else ctx + yield from capa.features.extractors.dotnetfile.extract_file_format(pe) -def extract_file_function_names(pe: dnfile.dnPE) -> Iterator[tuple[FunctionName, Address]]: - yield from capa.features.extractors.dotnetfile.extract_file_function_names(pe=pe) +def extract_file_function_names(ctx) -> Iterator[tuple[FunctionName, Address]]: + pe = ctx["pe"] if isinstance(ctx, dict) else ctx + yield from capa.features.extractors.dotnetfile.extract_file_function_names(pe) -def extract_file_strings(pe: dnfile.dnPE) -> Iterator[tuple[String, Address]]: - yield from capa.features.extractors.dotnetfile.extract_file_strings(pe=pe) +def extract_file_strings(ctx) -> Iterator[tuple[String, Address]]: + yield from capa.features.extractors.dotnetfile.extract_file_strings(ctx) -def extract_file_mixed_mode_characteristic_features(pe: dnfile.dnPE) -> Iterator[tuple[Characteristic, Address]]: - yield from capa.features.extractors.dotnetfile.extract_file_mixed_mode_characteristic_features(pe=pe) +def extract_file_mixed_mode_characteristic_features(ctx) -> Iterator[tuple[Characteristic, Address]]: + pe = ctx["pe"] if isinstance(ctx, dict) else ctx + yield from capa.features.extractors.dotnetfile.extract_file_mixed_mode_characteristic_features(pe) -def extract_file_namespace_features(pe: dnfile.dnPE) -> Iterator[tuple[Namespace, Address]]: - yield from capa.features.extractors.dotnetfile.extract_file_namespace_features(pe=pe) +def extract_file_namespace_features(ctx) -> Iterator[tuple[Namespace, Address]]: + pe = ctx["pe"] if isinstance(ctx, dict) else ctx + yield from capa.features.extractors.dotnetfile.extract_file_namespace_features(pe) -def extract_file_class_features(pe: dnfile.dnPE) -> Iterator[tuple[Class, Address]]: - yield from capa.features.extractors.dotnetfile.extract_file_class_features(pe=pe) +def extract_file_class_features(ctx) -> Iterator[tuple[Class, Address]]: + pe = ctx["pe"] if isinstance(ctx, dict) else ctx + yield from capa.features.extractors.dotnetfile.extract_file_class_features(pe) -def extract_features(pe: dnfile.dnPE) -> Iterator[tuple[Feature, Address]]: +def extract_features(ctx) -> Iterator[tuple[Feature, Address]]: for file_handler in FILE_HANDLERS: - for feature, address in file_handler(pe): + for feature, address in file_handler(ctx={"pe": ctx["pe"], "min_str_len": ctx["min_str_len"]}): yield feature, address diff --git a/capa/features/extractors/dnfile/insn.py b/capa/features/extractors/dnfile/insn.py index b80d01e762..59ec4a8ee7 100644 --- a/capa/features/extractors/dnfile/insn.py +++ b/capa/features/extractors/dnfile/insn.py @@ -198,7 +198,7 @@ def extract_insn_string_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iter if user_string is None: return - if len(user_string) >= 4: + if len(user_string) >= fh.ctx["min_str_len"]: yield String(user_string), ih.address diff --git a/capa/features/extractors/dotnetfile.py b/capa/features/extractors/dotnetfile.py index dcba2c2f2d..a0a1b74198 100644 --- a/capa/features/extractors/dotnetfile.py +++ b/capa/features/extractors/dotnetfile.py @@ -38,6 +38,7 @@ Characteristic, ) from capa.features.address import NO_ADDRESS, Address, DNTokenAddress +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH from capa.features.extractors.dnfile.types import DnType from capa.features.extractors.base_extractor import SampleHashes, StaticFeatureExtractor from capa.features.extractors.dnfile.helpers import ( @@ -55,12 +56,13 @@ logger = logging.getLogger(__name__) -def extract_file_format(**kwargs) -> Iterator[tuple[Format, Address]]: +def extract_file_format(ctx) -> Iterator[tuple[Format, Address]]: yield Format(FORMAT_DOTNET), NO_ADDRESS yield Format(FORMAT_PE), NO_ADDRESS -def extract_file_import_names(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Import, Address]]: +def extract_file_import_names(ctx) -> Iterator[tuple[Import, Address]]: + pe = ctx["pe"] if isinstance(ctx, dict) else ctx for method in get_dotnet_managed_imports(pe): # like System.IO.File::OpenRead yield Import(str(method)), DNTokenAddress(method.token) @@ -71,16 +73,18 @@ def extract_file_import_names(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Impor yield Import(name), DNTokenAddress(imp.token) -def extract_file_function_names(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[FunctionName, Address]]: +def extract_file_function_names(ctx) -> Iterator[tuple[FunctionName, Address]]: + pe = ctx["pe"] if isinstance(ctx, dict) else ctx for method in get_dotnet_managed_methods(pe): yield FunctionName(str(method)), DNTokenAddress(method.token) -def extract_file_namespace_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Namespace, Address]]: +def extract_file_namespace_features(ctx) -> Iterator[tuple[Namespace, Address]]: """emit namespace features from TypeRef and TypeDef tables""" # namespaces may be referenced multiple times, so we need to filter namespaces = set() + pe = ctx["pe"] if isinstance(ctx, dict) else ctx for _, typedef in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number): # emit internal .NET namespaces @@ -100,8 +104,9 @@ def extract_file_namespace_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple yield Namespace(namespace), NO_ADDRESS -def extract_file_class_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Class, Address]]: +def extract_file_class_features(ctx) -> Iterator[tuple[Class, Address]]: """emit class features from TypeRef and TypeDef tables""" + pe = ctx["pe"] if isinstance(ctx, dict) else ctx nested_class_table = get_dotnet_nested_class_table_index(pe) for rid, typedef in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number): @@ -123,13 +128,14 @@ def extract_file_class_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Cla yield Class(DnType.format_name(typerefname, namespace=typerefnamespace)), DNTokenAddress(token) -def extract_file_os(**kwargs) -> Iterator[tuple[OS, Address]]: +def extract_file_os(ctx) -> Iterator[tuple[OS, Address]]: yield OS(OS_ANY), NO_ADDRESS -def extract_file_arch(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Arch, Address]]: +def extract_file_arch(ctx) -> Iterator[tuple[Arch, Address]]: # to distinguish in more detail, see https://stackoverflow.com/a/23614024/10548020 # .NET 4.5 added option: any CPU, 32-bit preferred + pe = ctx["pe"] if isinstance(ctx, dict) else ctx assert pe.net is not None assert pe.net.Flags is not None @@ -141,20 +147,21 @@ def extract_file_arch(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Arch, Address yield Arch(ARCH_ANY), NO_ADDRESS -def extract_file_strings(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[String, Address]]: - yield from capa.features.extractors.common.extract_file_strings(pe.__data__) +def extract_file_strings(ctx) -> Iterator[tuple[String, Address]]: + pe = ctx["pe"] if isinstance(ctx, dict) else ctx + min_str_len = ctx.get("min_str_len", DEFAULT_STRING_LENGTH) if isinstance(ctx, dict) else DEFAULT_STRING_LENGTH + yield from capa.features.extractors.common.extract_file_strings(pe.__data__, min_str_len) -def extract_file_mixed_mode_characteristic_features( - pe: dnfile.dnPE, **kwargs -) -> Iterator[tuple[Characteristic, Address]]: +def extract_file_mixed_mode_characteristic_features(ctx) -> Iterator[tuple[Characteristic, Address]]: + pe = ctx["pe"] if isinstance(ctx, dict) else ctx if is_dotnet_mixed_mode(pe): yield Characteristic("mixed mode"), NO_ADDRESS -def extract_file_features(pe: dnfile.dnPE) -> Iterator[tuple[Feature, Address]]: +def extract_file_features(ctx) -> Iterator[tuple[Feature, Address]]: for file_handler in FILE_HANDLERS: - for feature, addr in file_handler(pe=pe): # type: ignore + for feature, addr in file_handler(ctx): # type: ignore yield feature, addr @@ -169,9 +176,9 @@ def extract_file_features(pe: dnfile.dnPE) -> Iterator[tuple[Feature, Address]]: ) -def extract_global_features(pe: dnfile.dnPE) -> Iterator[tuple[Feature, Address]]: +def extract_global_features(ctx) -> Iterator[tuple[Feature, Address]]: for handler in GLOBAL_HANDLERS: - for feature, va in handler(pe=pe): # type: ignore + for feature, va in handler(ctx): # type: ignore yield feature, va @@ -182,10 +189,11 @@ def extract_global_features(pe: dnfile.dnPE) -> Iterator[tuple[Feature, Address] class DotnetFileFeatureExtractor(StaticFeatureExtractor): - def __init__(self, path: Path): + def __init__(self, path: Path, min_str_len: int = DEFAULT_STRING_LENGTH): super().__init__(hashes=SampleHashes.from_bytes(path.read_bytes())) self.path: Path = path self.pe: dnfile.dnPE = dnfile.dnPE(str(path)) + self.min_str_len = min_str_len def get_base_address(self): return NO_ADDRESS @@ -203,7 +211,7 @@ def extract_global_features(self): yield from extract_global_features(self.pe) def extract_file_features(self): - yield from extract_file_features(self.pe) + yield from extract_file_features(ctx={"pe": self.pe, "min_str_len": DEFAULT_STRING_LENGTH}) def is_dotnet_file(self) -> bool: return bool(self.pe.net) diff --git a/capa/features/extractors/elffile.py b/capa/features/extractors/elffile.py index 3f4eea7522..101d36f697 100644 --- a/capa/features/extractors/elffile.py +++ b/capa/features/extractors/elffile.py @@ -23,12 +23,15 @@ from capa.features.file import Export, Import, Section from capa.features.common import OS, FORMAT_ELF, Arch, Format, Feature from capa.features.address import NO_ADDRESS, FileOffsetAddress, AbsoluteVirtualAddress +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH from capa.features.extractors.base_extractor import SampleHashes, StaticFeatureExtractor logger = logging.getLogger(__name__) -def extract_file_export_names(elf: ELFFile, **kwargs): +def extract_file_export_names(ctx): + elf = ctx["elf"] if isinstance(ctx, dict) else ctx + for section in elf.iter_sections(): if not isinstance(section, SymbolTableSection): continue @@ -79,7 +82,8 @@ def extract_file_export_names(elf: ELFFile, **kwargs): yield Export(symbol.name), AbsoluteVirtualAddress(symbol.entry.st_value) -def extract_file_import_names(elf: ELFFile, **kwargs): +def extract_file_import_names(ctx): + elf = ctx["elf"] if isinstance(ctx, dict) else ctx symbol_name_by_index: dict[int, str] = {} # Extract symbol names and store them in the dictionary @@ -139,7 +143,9 @@ def extract_file_import_names(elf: ELFFile, **kwargs): yield Import(symbol_name), FileOffsetAddress(symbol_address) -def extract_file_section_names(elf: ELFFile, **kwargs): +def extract_file_section_names(ctx): + elf = ctx["elf"] if isinstance(ctx, dict) else ctx + for section in elf.iter_sections(): if section.name: yield Section(section.name), AbsoluteVirtualAddress(section.header.sh_addr) @@ -147,8 +153,9 @@ def extract_file_section_names(elf: ELFFile, **kwargs): yield Section("NULL"), AbsoluteVirtualAddress(section.header.sh_addr) -def extract_file_strings(buf, **kwargs): - yield from capa.features.extractors.common.extract_file_strings(buf) +def extract_file_strings(ctx): + + yield from capa.features.extractors.common.extract_file_strings(ctx["buf"], ctx["min_str_len"]) def extract_file_os(elf: ELFFile, buf, **kwargs): @@ -179,9 +186,9 @@ def extract_file_arch(elf: ELFFile, **kwargs): logger.warning("unsupported architecture: %s", arch) -def extract_file_features(elf: ELFFile, buf: bytes) -> Iterator[tuple[Feature, int]]: +def extract_file_features(ctx) -> Iterator[tuple[Feature, int]]: for file_handler in FILE_HANDLERS: - for feature, addr in file_handler(elf=elf, buf=buf): # type: ignore + for feature, addr in file_handler(ctx=ctx): # type: ignore yield feature, addr @@ -208,9 +215,10 @@ def extract_global_features(elf: ELFFile, buf: bytes) -> Iterator[tuple[Feature, class ElfFeatureExtractor(StaticFeatureExtractor): - def __init__(self, path: Path): + def __init__(self, path: Path, min_str_len: int = DEFAULT_STRING_LENGTH): super().__init__(SampleHashes.from_bytes(path.read_bytes())) self.path: Path = path + self.min_str_len = min_str_len self.elf = ELFFile(io.BytesIO(path.read_bytes())) def get_base_address(self): @@ -228,7 +236,7 @@ def extract_global_features(self): def extract_file_features(self): buf = self.path.read_bytes() - for feature, addr in extract_file_features(self.elf, buf): + for feature, addr in extract_file_features(ctx={"elf": self.elf, "buf": buf, "min_str_len": self.min_str_len}): yield feature, addr def get_functions(self): diff --git a/capa/features/extractors/ida/extractor.py b/capa/features/extractors/ida/extractor.py index b139f2f38f..e03e8d257c 100644 --- a/capa/features/extractors/ida/extractor.py +++ b/capa/features/extractors/ida/extractor.py @@ -25,6 +25,7 @@ import capa.features.extractors.ida.basicblock from capa.features.common import Feature from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH from capa.features.extractors.base_extractor import ( BBHandle, InsnHandle, @@ -35,7 +36,7 @@ class IdaFeatureExtractor(StaticFeatureExtractor): - def __init__(self): + def __init__(self, min_str_len: int = DEFAULT_STRING_LENGTH): super().__init__( hashes=SampleHashes( md5=capa.ida.helpers.retrieve_input_file_md5(), @@ -47,6 +48,7 @@ def __init__(self): self.global_features.extend(capa.features.extractors.ida.file.extract_file_format()) self.global_features.extend(capa.features.extractors.ida.global_.extract_os()) self.global_features.extend(capa.features.extractors.ida.global_.extract_arch()) + self.min_str_len = min_str_len def get_base_address(self): return AbsoluteVirtualAddress(idaapi.get_imagebase()) @@ -55,7 +57,7 @@ def extract_global_features(self): yield from self.global_features def extract_file_features(self): - yield from capa.features.extractors.ida.file.extract_features() + yield from capa.features.extractors.ida.file.extract_features(ctx={"min_str_len": self.min_str_len}) def get_functions(self) -> Iterator[FunctionHandle]: import capa.features.extractors.ida.helpers as ida_helpers @@ -64,9 +66,11 @@ def get_functions(self) -> Iterator[FunctionHandle]: yield from ida_helpers.get_functions(skip_thunks=True, skip_libs=True) @staticmethod - def get_function(ea: int) -> FunctionHandle: + def get_function(self, ea: int) -> FunctionHandle: f = idaapi.get_func(ea) - return FunctionHandle(address=AbsoluteVirtualAddress(f.start_ea), inner=f) + return FunctionHandle( + address=AbsoluteVirtualAddress(f.start_ea), inner=f, ctx={"min_str_len": self.min_str_len} + ) def extract_function_features(self, fh: FunctionHandle) -> Iterator[tuple[Feature, Address]]: yield from capa.features.extractors.ida.function.extract_features(fh) diff --git a/capa/features/extractors/ida/file.py b/capa/features/extractors/ida/file.py index a47f1524c5..6ccfa8787e 100644 --- a/capa/features/extractors/ida/file.py +++ b/capa/features/extractors/ida/file.py @@ -78,7 +78,7 @@ def check_segment_for_pe(seg: idaapi.segment_t) -> Iterator[tuple[int, int]]: yield off, i -def extract_file_embedded_pe() -> Iterator[tuple[Feature, Address]]: +def extract_file_embedded_pe(ctx) -> Iterator[tuple[Feature, Address]]: """extract embedded PE features IDA must load resource sections for this to be complete @@ -90,7 +90,7 @@ def extract_file_embedded_pe() -> Iterator[tuple[Feature, Address]]: yield Characteristic("embedded pe"), FileOffsetAddress(ea) -def extract_file_export_names() -> Iterator[tuple[Feature, Address]]: +def extract_file_export_names(ctx) -> Iterator[tuple[Feature, Address]]: """extract function exports""" for _, ordinal, ea, name in idautils.Entries(): forwarded_name = ida_entry.get_entry_forwarder(ordinal) @@ -102,7 +102,7 @@ def extract_file_export_names() -> Iterator[tuple[Feature, Address]]: yield Characteristic("forwarded export"), AbsoluteVirtualAddress(ea) -def extract_file_import_names() -> Iterator[tuple[Feature, Address]]: +def extract_file_import_names(ctx) -> Iterator[tuple[Feature, Address]]: """extract function imports 1. imports by ordinal: @@ -138,7 +138,7 @@ def extract_file_import_names() -> Iterator[tuple[Feature, Address]]: yield Import(info[1]), AbsoluteVirtualAddress(ea) -def extract_file_section_names() -> Iterator[tuple[Feature, Address]]: +def extract_file_section_names(ctx) -> Iterator[tuple[Feature, Address]]: """extract section names IDA must load resource sections for this to be complete @@ -149,7 +149,7 @@ def extract_file_section_names() -> Iterator[tuple[Feature, Address]]: yield Section(idaapi.get_segm_name(seg)), AbsoluteVirtualAddress(seg.start_ea) -def extract_file_strings() -> Iterator[tuple[Feature, Address]]: +def extract_file_strings(ctx) -> Iterator[tuple[Feature, Address]]: """extract ASCII and UTF-16 LE strings IDA must load resource sections for this to be complete @@ -160,14 +160,14 @@ def extract_file_strings() -> Iterator[tuple[Feature, Address]]: seg_buff = capa.features.extractors.ida.helpers.get_segment_buffer(seg) # differing to common string extractor factor in segment offset here - for s in capa.features.extractors.strings.extract_ascii_strings(seg_buff): + for s in capa.features.extractors.strings.extract_ascii_strings(seg_buff, min_str_len=ctx["min_str_len"]): yield String(s.s), FileOffsetAddress(seg.start_ea + s.offset) - for s in capa.features.extractors.strings.extract_unicode_strings(seg_buff): + for s in capa.features.extractors.strings.extract_unicode_strings(seg_buff, min_str_len=ctx["min_str_len"]): yield String(s.s), FileOffsetAddress(seg.start_ea + s.offset) -def extract_file_function_names() -> Iterator[tuple[Feature, Address]]: +def extract_file_function_names(ctx) -> Iterator[tuple[Feature, Address]]: """ extract the names of statically-linked library functions. """ @@ -184,7 +184,7 @@ def extract_file_function_names() -> Iterator[tuple[Feature, Address]]: yield FunctionName(name[1:]), addr -def extract_file_format() -> Iterator[tuple[Feature, Address]]: +def extract_file_format(ctx = None) -> Iterator[tuple[Feature, Address]]: filetype = capa.ida.helpers.get_filetype() if filetype in (idaapi.f_PE, idaapi.f_COFF): @@ -198,10 +198,10 @@ def extract_file_format() -> Iterator[tuple[Feature, Address]]: raise NotImplementedError(f"unexpected file format: {filetype}") -def extract_features() -> Iterator[tuple[Feature, Address]]: +def extract_features(ctx) -> Iterator[tuple[Feature, Address]]: """extract file features""" for file_handler in FILE_HANDLERS: - for feature, addr in file_handler(): + for feature, addr in file_handler(ctx=ctx): yield feature, addr diff --git a/capa/features/extractors/ida/helpers.py b/capa/features/extractors/ida/helpers.py index 365a20675c..2bdb23e2ec 100644 --- a/capa/features/extractors/ida/helpers.py +++ b/capa/features/extractors/ida/helpers.py @@ -23,6 +23,7 @@ import ida_segment from capa.features.address import AbsoluteVirtualAddress +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH from capa.features.extractors.base_extractor import FunctionHandle IDA_NALT_ENCODING = ida_nalt.get_default_encoding_idx(ida_nalt.BPU_1B) # use one byte-per-character encoding @@ -246,10 +247,10 @@ def read_bytes_at(ea: int, count: int) -> bytes: return idc.get_bytes(ea, count) -def find_string_at(ea: int, min_: int = 4) -> str: +def find_string_at(ea: int, min_str_len: int = DEFAULT_STRING_LENGTH) -> str: """check if ASCII string exists at a given virtual address""" found = idaapi.get_strlit_contents(ea, -1, idaapi.STRTYPE_C) - if found and len(found) >= min_: + if found and len(found) >= min_str_len: try: found = found.decode("ascii") # hacky check for IDA bug; get_strlit_contents also reads Unicode as diff --git a/capa/features/extractors/ida/insn.py b/capa/features/extractors/ida/insn.py index 0e92b21f5e..45d9fca40a 100644 --- a/capa/features/extractors/ida/insn.py +++ b/capa/features/extractors/ida/insn.py @@ -221,8 +221,8 @@ def extract_insn_string_features( ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn) if ref != insn.ea: - found = capa.features.extractors.ida.helpers.find_string_at(ref) - if found: + found = capa.features.extractors.ida.helpers.find_string_at(ref, fh.ctx["min_str_len"]) + if len(found) >= fh.ctx["min_str_len"]: yield String(found), ih.address diff --git a/capa/features/extractors/pefile.py b/capa/features/extractors/pefile.py index 8b76e1d8ab..317c321df4 100644 --- a/capa/features/extractors/pefile.py +++ b/capa/features/extractors/pefile.py @@ -26,17 +26,21 @@ from capa.features.file import Export, Import, Section from capa.features.common import OS, ARCH_I386, FORMAT_PE, ARCH_AMD64, OS_WINDOWS, Arch, Format, Characteristic from capa.features.address import NO_ADDRESS, FileOffsetAddress, AbsoluteVirtualAddress +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH from capa.features.extractors.base_extractor import SampleHashes, StaticFeatureExtractor logger = logging.getLogger(__name__) -def extract_file_embedded_pe(buf, **kwargs): +def extract_file_embedded_pe(ctx): + buf = ctx["buf"] + for offset, _ in capa.features.extractors.helpers.carve_pe(buf, 1): yield Characteristic("embedded pe"), FileOffsetAddress(offset) -def extract_file_export_names(pe, **kwargs): +def extract_file_export_names(ctx): + pe = ctx["pe"] if isinstance(ctx, dict) else ctx base_address = pe.OPTIONAL_HEADER.ImageBase if hasattr(pe, "DIRECTORY_ENTRY_EXPORT"): @@ -63,7 +67,7 @@ def extract_file_export_names(pe, **kwargs): yield Characteristic("forwarded export"), AbsoluteVirtualAddress(va) -def extract_file_import_names(pe, **kwargs): +def extract_file_import_names(ctx): """ extract imported function names 1. imports by ordinal: @@ -72,6 +76,8 @@ def extract_file_import_names(pe, **kwargs): - modulename.importname - importname """ + pe = ctx["pe"] if isinstance(ctx, dict) else ctx + if hasattr(pe, "DIRECTORY_ENTRY_IMPORT"): for dll in pe.DIRECTORY_ENTRY_IMPORT: try: @@ -95,7 +101,8 @@ def extract_file_import_names(pe, **kwargs): yield Import(name), AbsoluteVirtualAddress(imp.address) -def extract_file_section_names(pe, **kwargs): +def extract_file_section_names(ctx): + pe = ctx["pe"] if isinstance(ctx, dict) else ctx base_address = pe.OPTIONAL_HEADER.ImageBase for section in pe.sections: @@ -107,8 +114,8 @@ def extract_file_section_names(pe, **kwargs): yield Section(name), AbsoluteVirtualAddress(base_address + section.VirtualAddress) -def extract_file_strings(buf, **kwargs): - yield from capa.features.extractors.common.extract_file_strings(buf) +def extract_file_strings(ctx): + yield from capa.features.extractors.common.extract_file_strings(ctx["buf"], ctx["min_str_len"]) def extract_file_function_names(**kwargs): @@ -143,7 +150,7 @@ def extract_file_arch(pe, **kwargs): logger.warning("unknown architecture: %s", pe.FILE_HEADER.Machine) -def extract_file_features(pe, buf): +def extract_file_features(ctx): """ extract file features from given workspace @@ -154,10 +161,9 @@ def extract_file_features(pe, buf): yields: tuple[Feature, VA]: a feature and its location. """ - for file_handler in FILE_HANDLERS: # file_handler: type: (pe, bytes) -> Iterable[tuple[Feature, Address]] - for feature, va in file_handler(pe=pe, buf=buf): # type: ignore + for feature, va in file_handler(ctx=ctx): # type: ignore yield feature, va @@ -196,10 +202,11 @@ def extract_global_features(pe, buf): class PefileFeatureExtractor(StaticFeatureExtractor): - def __init__(self, path: Path): + def __init__(self, path: Path, min_str_len: int = DEFAULT_STRING_LENGTH): super().__init__(hashes=SampleHashes.from_bytes(path.read_bytes())) self.path: Path = path self.pe = pefile.PE(str(path)) + self.min_str_len = min_str_len def get_base_address(self): return AbsoluteVirtualAddress(self.pe.OPTIONAL_HEADER.ImageBase) @@ -212,7 +219,7 @@ def extract_global_features(self): def extract_file_features(self): buf = Path(self.path).read_bytes() - yield from extract_file_features(self.pe, buf) + yield from extract_file_features(ctx={"pe": self.pe, "buf": buf, "min_str_len": self.min_str_len}) def get_functions(self): raise NotImplementedError("PefileFeatureExtract can only be used to extract file features") diff --git a/capa/features/extractors/strings.py b/capa/features/extractors/strings.py index 04a28f64b7..24a653afbb 100644 --- a/capa/features/extractors/strings.py +++ b/capa/features/extractors/strings.py @@ -20,11 +20,12 @@ from dataclasses import dataclass from collections.abc import Iterator +DEFAULT_STRING_LENGTH = 4 ASCII_BYTE = r" !\"#\$%&\'\(\)\*\+,-\./0123456789:;<=>\?@ABCDEFGHIJKLMNOPQRSTUVWXYZ\[\]\^_`abcdefghijklmnopqrstuvwxyz\{\|\}\\\~\t".encode( "ascii" ) -ASCII_RE_4 = re.compile(b"([%s]{%d,})" % (ASCII_BYTE, 4)) -UNICODE_RE_4 = re.compile(b"((?:[%s]\x00){%d,})" % (ASCII_BYTE, 4)) +ASCII_RE_DEFAULT = re.compile(b"([%s]{%d,})" % (ASCII_BYTE, DEFAULT_STRING_LENGTH)) +UNICODE_RE_DEFAULT = re.compile(b"((?:[%s]\x00){%d,})" % (ASCII_BYTE, DEFAULT_STRING_LENGTH)) REPEATS = {ord("A"), 0x00, 0xFE, 0xFF} SLICE_SIZE = 4096 PRINTABLE_CHAR_SET = set(string.printable) @@ -78,56 +79,56 @@ def buf_filled_with(buf: bytes, character: int) -> bool: return True -def extract_ascii_strings(buf: bytes, n: int = 4) -> Iterator[String]: +def extract_ascii_strings(buf: bytes, min_str_len=DEFAULT_STRING_LENGTH) -> Iterator[String]: """ Extract ASCII strings from the given binary data. Params: buf: the bytes from which to extract strings - n: minimum string length + min_len: minimum string length """ if not buf: return - if n < 1: + if min_str_len < 1: raise ValueError("minimum string length must be positive") if (buf[0] in REPEATS) and buf_filled_with(buf, buf[0]): return r = None - if n == 4: - r = ASCII_RE_4 + if min_str_len == DEFAULT_STRING_LENGTH: + r = ASCII_RE_DEFAULT else: - reg = b"([%s]{%d,})" % (ASCII_BYTE, n) + reg = b"([%s]{%d,})" % (ASCII_BYTE, min_str_len) r = re.compile(reg) for match in r.finditer(buf): yield String(match.group().decode("ascii"), match.start()) -def extract_unicode_strings(buf: bytes, n: int = 4) -> Iterator[String]: +def extract_unicode_strings(buf: bytes, min_str_len=DEFAULT_STRING_LENGTH) -> Iterator[String]: """ Extract naive UTF-16 strings from the given binary data. Params: buf: the bytes from which to extract strings - n: minimum string length + min_len: minimum string length """ if not buf: return - if n < 1: + if min_str_len < 1: raise ValueError("minimum string length must be positive") if (buf[0] in REPEATS) and buf_filled_with(buf, buf[0]): return - if n == 4: - r = UNICODE_RE_4 + if min_str_len == DEFAULT_STRING_LENGTH: + r = UNICODE_RE_DEFAULT else: - reg = b"((?:[%s]\x00){%d,})" % (ASCII_BYTE, n) + reg = b"((?:[%s]\x00){%d,})" % (ASCII_BYTE, min_str_len) r = re.compile(reg) for match in r.finditer(buf): with contextlib.suppress(UnicodeDecodeError): diff --git a/capa/features/extractors/viv/extractor.py b/capa/features/extractors/viv/extractor.py index 99d60e4a80..4e6cf0681e 100644 --- a/capa/features/extractors/viv/extractor.py +++ b/capa/features/extractors/viv/extractor.py @@ -27,6 +27,7 @@ import capa.features.extractors.viv.basicblock from capa.features.common import Feature from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH from capa.features.extractors.base_extractor import ( BBHandle, InsnHandle, @@ -39,15 +40,16 @@ class VivisectFeatureExtractor(StaticFeatureExtractor): - def __init__(self, vw, path: Path, os): + def __init__(self, vw, path: Path, os, min_str_len: int = DEFAULT_STRING_LENGTH): self.vw = vw self.path = path self.buf = path.read_bytes() + self.min_str_len = min_str_len super().__init__(hashes=SampleHashes.from_bytes(self.buf)) # pre-compute these because we'll yield them at *every* scope. self.global_features: list[tuple[Feature, Address]] = [] - self.global_features.extend(capa.features.extractors.viv.file.extract_file_format(self.buf)) + self.global_features.extend(capa.features.extractors.viv.file.extract_file_format(ctx={"buf": self.buf})) self.global_features.extend(capa.features.extractors.common.extract_os(self.buf, os)) self.global_features.extend(capa.features.extractors.viv.global_.extract_arch(self.vw)) @@ -59,13 +61,17 @@ def extract_global_features(self): yield from self.global_features def extract_file_features(self): - yield from capa.features.extractors.viv.file.extract_features(self.vw, self.buf) + yield from capa.features.extractors.viv.file.extract_features( + ctx={"vw": self.vw, "buf": self.buf, "min_str_len": self.min_str_len} + ) def get_functions(self) -> Iterator[FunctionHandle]: cache: dict[str, Any] = {} for va in sorted(self.vw.getFunctions()): yield FunctionHandle( - address=AbsoluteVirtualAddress(va), inner=viv_utils.Function(self.vw, va), ctx={"cache": cache} + address=AbsoluteVirtualAddress(va), + inner=viv_utils.Function(self.vw, va), + ctx={"cache": cache, "min_str_len": self.min_str_len}, ) def extract_function_features(self, fh: FunctionHandle) -> Iterator[tuple[Feature, Address]]: diff --git a/capa/features/extractors/viv/file.py b/capa/features/extractors/viv/file.py index 5f9df620a5..69e8738c26 100644 --- a/capa/features/extractors/viv/file.py +++ b/capa/features/extractors/viv/file.py @@ -28,7 +28,8 @@ from capa.features.address import Address, FileOffsetAddress, AbsoluteVirtualAddress -def extract_file_embedded_pe(buf, **kwargs) -> Iterator[tuple[Feature, Address]]: +def extract_file_embedded_pe(ctx) -> Iterator[tuple[Feature, Address]]: + buf = ctx["buf"] for offset, _ in pe_carve.carve(buf, 1): yield Characteristic("embedded pe"), FileOffsetAddress(offset) @@ -44,7 +45,8 @@ def get_first_vw_filename(vw: vivisect.VivWorkspace): return next(iter(vw.filemeta.keys())) -def extract_file_export_names(vw: vivisect.VivWorkspace, **kwargs) -> Iterator[tuple[Feature, Address]]: +def extract_file_export_names(ctx) -> Iterator[tuple[Feature, Address]]: + vw = ctx["vw"] for va, _, name, _ in vw.getExports(): yield Export(name), AbsoluteVirtualAddress(va) @@ -63,7 +65,7 @@ def extract_file_export_names(vw: vivisect.VivWorkspace, **kwargs) -> Iterator[t yield Characteristic("forwarded export"), AbsoluteVirtualAddress(va) -def extract_file_import_names(vw, **kwargs) -> Iterator[tuple[Feature, Address]]: +def extract_file_import_names(ctx) -> Iterator[tuple[Feature, Address]]: """ extract imported function names 1. imports by ordinal: @@ -72,6 +74,7 @@ def extract_file_import_names(vw, **kwargs) -> Iterator[tuple[Feature, Address]] - modulename.importname - importname """ + vw = ctx["vw"] for va, _, _, tinfo in vw.getImports(): # vivisect source: tinfo = "%s.%s" % (libname, impname) modname, impname = tinfo.split(".", 1) @@ -98,19 +101,23 @@ def is_viv_ord_impname(impname: str) -> bool: return True -def extract_file_section_names(vw, **kwargs) -> Iterator[tuple[Feature, Address]]: +def extract_file_section_names(ctx) -> Iterator[tuple[Feature, Address]]: + vw = ctx["vw"] for va, _, segname, _ in vw.getSegments(): yield Section(segname), AbsoluteVirtualAddress(va) -def extract_file_strings(buf, **kwargs) -> Iterator[tuple[Feature, Address]]: - yield from capa.features.extractors.common.extract_file_strings(buf) +def extract_file_strings(ctx) -> Iterator[tuple[Feature, Address]]: + buf = ctx["buf"] + min_str_len = ctx["min_str_len"] + yield from capa.features.extractors.common.extract_file_strings(buf, min_str_len) -def extract_file_function_names(vw, **kwargs) -> Iterator[tuple[Feature, Address]]: +def extract_file_function_names(ctx) -> Iterator[tuple[Feature, Address]]: """ extract the names of statically-linked library functions. """ + vw = ctx["vw"] for va in sorted(vw.getFunctions()): addr = AbsoluteVirtualAddress(va) if viv_utils.flirt.is_library_function(vw, va): @@ -124,24 +131,25 @@ def extract_file_function_names(vw, **kwargs) -> Iterator[tuple[Feature, Address yield FunctionName(name[1:]), addr -def extract_file_format(buf, **kwargs) -> Iterator[tuple[Feature, Address]]: +def extract_file_format(ctx) -> Iterator[tuple[Feature, Address]]: + buf = ctx["buf"] yield from capa.features.extractors.common.extract_format(buf) -def extract_features(vw, buf: bytes) -> Iterator[tuple[Feature, Address]]: +def extract_features(ctx) -> Iterator[tuple[Feature, Address]]: """ extract file features from given workspace args: vw (vivisect.VivWorkspace): the vivisect workspace buf: the raw input file bytes + ctx (dict): A context dictionary containing metadata and configuration information for extraction. yields: tuple[Feature, Address]: a feature and its location. """ - for file_handler in FILE_HANDLERS: - for feature, addr in file_handler(vw=vw, buf=buf): # type: ignore + for feature, addr in file_handler(ctx=ctx): # type: ignore yield feature, addr diff --git a/capa/features/extractors/viv/insn.py b/capa/features/extractors/viv/insn.py index 0b3e79f990..00fe8bca25 100644 --- a/capa/features/extractors/viv/insn.py +++ b/capa/features/extractors/viv/insn.py @@ -708,7 +708,7 @@ def extract_op_string_features( except ValueError: continue else: - if len(s) >= 4: + if len(s) >= fh.ctx["min_str_len"]: yield String(s), ih.address diff --git a/capa/features/extractors/vmray/file.py b/capa/features/extractors/vmray/file.py index 7a8c914942..bacf48e688 100644 --- a/capa/features/extractors/vmray/file.py +++ b/capa/features/extractors/vmray/file.py @@ -21,6 +21,7 @@ from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress from capa.features.extractors.vmray import VMRayAnalysis from capa.features.extractors.helpers import generate_symbols +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH logger = logging.getLogger(__name__) @@ -68,7 +69,9 @@ def extract_referenced_registry_key_names(analysis: VMRayAnalysis) -> Iterator[t def extract_file_strings(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]: if analysis.submission_static is not None: - yield from capa.features.extractors.common.extract_file_strings(analysis.submission_bytes) + yield from capa.features.extractors.common.extract_file_strings( + analysis.submission_bytes, min_str_len=DEFAULT_STRING_LENGTH + ) def extract_features(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]: diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index 36d104c894..af19391fbe 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -1014,7 +1014,7 @@ def load_capa_function_results(self): try: f = idaapi.get_func(idaapi.get_screen_ea()) if f is not None: - self.rulegen_current_function = self.rulegen_feature_extractor.get_function(f.start_ea) + self.rulegen_current_function = self.rulegen_feature_extractor.get_function(ea=f.start_ea) except Exception as e: logger.exception("Failed to resolve function at address 0x%X (error: %s)", f.start_ea, e) return False diff --git a/capa/loader.py b/capa/loader.py index ec0295ac8f..9bf01cb302 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -60,6 +60,7 @@ ) from capa.features.address import Address from capa.capabilities.common import Capabilities +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH from capa.features.extractors.base_extractor import ( SampleHashes, FeatureExtractor, @@ -211,6 +212,7 @@ def get_extractor( should_save_workspace=False, disable_progress=False, sample_path: Optional[Path] = None, + min_str_len=DEFAULT_STRING_LENGTH, ) -> FeatureExtractor: """ raises: @@ -245,7 +247,7 @@ def get_extractor( if input_format not in (FORMAT_PE, FORMAT_DOTNET): raise UnsupportedFormatError() - return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(input_path) + return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(input_path, min_str_len) elif backend == BACKEND_BINJA: import capa.features.extractors.binja.find_binja_api as finder @@ -280,7 +282,7 @@ def get_extractor( elif backend == BACKEND_PEFILE: import capa.features.extractors.pefile - return capa.features.extractors.pefile.PefileFeatureExtractor(input_path) + return capa.features.extractors.pefile.PefileFeatureExtractor(input_path, min_str_len) elif backend == BACKEND_VIV: import capa.features.extractors.viv.extractor @@ -308,7 +310,7 @@ def get_extractor( else: logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace") - return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, input_path, os_) + return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, input_path, os_, min_str_len) elif backend == BACKEND_FREEZE: return frz.load(input_path.read_bytes()) @@ -377,7 +379,9 @@ def _get_binexport2_file_extractors(input_file: Path) -> list[FeatureExtractor]: return [] -def get_file_extractors(input_file: Path, input_format: str) -> list[FeatureExtractor]: +def get_file_extractors( + input_file: Path, input_format: str, min_str_len: int = DEFAULT_STRING_LENGTH +) -> list[FeatureExtractor]: file_extractors: list[FeatureExtractor] = [] # we use lazy importing here to avoid eagerly loading dependencies @@ -387,19 +391,19 @@ def get_file_extractors(input_file: Path, input_format: str) -> list[FeatureExtr if input_format == FORMAT_PE: import capa.features.extractors.pefile - file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file)) + file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file, min_str_len)) elif input_format == FORMAT_DOTNET: import capa.features.extractors.pefile import capa.features.extractors.dotnetfile - file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file)) + file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file, min_str_len)) file_extractors.append(capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(input_file)) elif input_format == FORMAT_ELF: import capa.features.extractors.elffile - file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(input_file)) + file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(input_file, min_str_len)) elif input_format == FORMAT_CAPE: import capa.features.extractors.cape.extractor diff --git a/capa/main.py b/capa/main.py index 3e4af74e79..39e2e667ec 100644 --- a/capa/main.py +++ b/capa/main.py @@ -106,6 +106,7 @@ find_file_capabilities, has_dynamic_limitation, ) +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH from capa.features.extractors.base_extractor import ( ProcessFilter, FunctionFilter, @@ -707,7 +708,7 @@ def get_rules_from_cli(args) -> RuleSet: return rules -def get_file_extractors_from_cli(args, input_format: str) -> list[FeatureExtractor]: +def get_file_extractors_from_cli(args, input_format: str, min_str_len: int) -> list[FeatureExtractor]: """ args: args: The parsed command line arguments from `install_common_args`. @@ -724,7 +725,7 @@ def get_file_extractors_from_cli(args, input_format: str) -> list[FeatureExtract # this pass can inspect multiple file extractors, e.g., dotnet and pe to identify # various limitations try: - return capa.loader.get_file_extractors(args.input_file, input_format) + return capa.loader.get_file_extractors(args.input_file, input_format, min_str_len) except PEFormatError as e: logger.error("Input file '%s' is not a valid PE file: %s", args.input_file, str(e)) raise ShouldExitError(E_CORRUPT_FILE) from e @@ -877,6 +878,7 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr should_save_workspace=should_save_workspace, disable_progress=args.quiet or args.debug, sample_path=sample_path, + min_str_len=DEFAULT_STRING_LENGTH, ) return apply_extractor_filters(extractor, extractor_filters) except UnsupportedFormatError as e: @@ -1017,7 +1019,7 @@ def main(argv: Optional[list[str]] = None): rules: RuleSet = get_rules_from_cli(args) found_limitation = False - file_extractors = get_file_extractors_from_cli(args, input_format) + file_extractors = get_file_extractors_from_cli(args, input_format, DEFAULT_STRING_LENGTH) if input_format in STATIC_FORMATS: # only static extractors have file limitations found_limitation = find_static_limitations_from_cli(args, rules, file_extractors) @@ -1034,19 +1036,20 @@ def main(argv: Optional[list[str]] = None): except ShouldExitError as e: return e.status_code - capabilities: Capabilities = find_capabilities(rules, extractor, disable_progress=args.quiet) + if input_format != FORMAT_RESULT: + capabilities: Capabilities = find_capabilities(rules, extractor, disable_progress=args.quiet) - meta: rdoc.Metadata = capa.loader.collect_metadata( - argv, args.input_file, input_format, os_, args.rules, extractor, capabilities - ) - meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches) + meta: rdoc.Metadata = capa.loader.collect_metadata( + argv, args.input_file, input_format, os_, args.rules, extractor, capabilities + ) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches) - if found_limitation: - # bail if capa's static feature extractor encountered file limitation e.g. a packed binary - # or capa's dynamic feature extractor encountered some limitation e.g. a dotnet sample - # do show the output in verbose mode, though. - if not (args.verbose or args.vverbose or args.json): - return E_FILE_LIMITATION + if found_limitation: + # bail if capa's static feature extractor encountered file limitation e.g. a packed binary + # or capa's dynamic feature extractor encountered some limitation e.g. a dotnet sample + # do show the output in verbose mode, though. + if not (args.verbose or args.vverbose or args.json): + return E_FILE_LIMITATION if args.json: print(capa.render.json.render(meta, rules, capabilities.matches)) @@ -1091,7 +1094,9 @@ def ida_main(): meta = capa.ida.helpers.collect_metadata([rules_path]) - capabilities = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor()) + capabilities = find_capabilities( + rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor(DEFAULT_STRING_LENGTH) + ) meta.analysis.feature_counts = capabilities.feature_counts meta.analysis.library_functions = capabilities.library_functions diff --git a/rules b/rules index d64c2c91ea..6697513245 160000 --- a/rules +++ b/rules @@ -1 +1 @@ -Subproject commit d64c2c91ea4be309fb42aea13bf185bf76013ea2 +Subproject commit 66975132455e3e22520a84dca14cca1d3afd292a diff --git a/tests/fixtures.py b/tests/fixtures.py index b9199061d5..c1ce4fe114 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -43,6 +43,7 @@ FeatureAccess, ) from capa.features.address import Address +from capa.features.extractors.strings import DEFAULT_STRING_LENGTH from capa.features.extractors.base_extractor import ( BBHandle, CallHandle, @@ -117,7 +118,9 @@ def get_viv_extractor(path: Path): else: vw = capa.loader.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths) vw.saveWorkspace() - extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, OS_AUTO) + extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor( + vw, path, OS_AUTO, DEFAULT_STRING_LENGTH + ) fixup_viv(path, extractor) return extractor @@ -137,8 +140,9 @@ def fixup_viv(path: Path, extractor): @lru_cache(maxsize=1) def get_pefile_extractor(path: Path): import capa.features.extractors.pefile + from capa.features.extractors.strings import DEFAULT_STRING_LENGTH - extractor = capa.features.extractors.pefile.PefileFeatureExtractor(path) + extractor = capa.features.extractors.pefile.PefileFeatureExtractor(path, DEFAULT_STRING_LENGTH) # overload the extractor so that the fixture exposes `extractor.path` setattr(extractor, "path", path.as_posix()) @@ -149,8 +153,9 @@ def get_pefile_extractor(path: Path): @lru_cache(maxsize=1) def get_dnfile_extractor(path: Path): import capa.features.extractors.dnfile.extractor + from capa.features.extractors.strings import DEFAULT_STRING_LENGTH - extractor = capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path) + extractor = capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path, DEFAULT_STRING_LENGTH) # overload the extractor so that the fixture exposes `extractor.path` setattr(extractor, "path", path.as_posix()) @@ -161,8 +166,9 @@ def get_dnfile_extractor(path: Path): @lru_cache(maxsize=1) def get_dotnetfile_extractor(path: Path): import capa.features.extractors.dotnetfile + from capa.features.extractors.strings import DEFAULT_STRING_LENGTH - extractor = capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(path) + extractor = capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(path, DEFAULT_STRING_LENGTH) # overload the extractor so that the fixture exposes `extractor.path` setattr(extractor, "path", path.as_posix()) diff --git a/tests/test_strings.py b/tests/test_strings.py index 727af67d97..0ff95b7737 100644 --- a/tests/test_strings.py +++ b/tests/test_strings.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest from capa.features.extractors.strings import ( String, buf_filled_with, @@ -47,7 +48,7 @@ def test_extract_ascii_strings(): # min length buf = b"Hi\x00Test\x00" - strings = list(extract_ascii_strings(buf, n=4)) + strings = list(extract_ascii_strings(buf, min_str_len=4)) assert len(strings) == 1 assert strings[0] == String("Test", 3) @@ -75,7 +76,7 @@ def test_extract_unicode_strings(): # min length buf = b"H\x00i\x00\x00\x00T\x00e\x00s\x00t\x00\x00\x00" - strings = list(extract_unicode_strings(buf, n=4)) + strings = list(extract_unicode_strings(buf, min_str_len=4)) assert len(strings) == 1 assert strings[0] == String("Test", 6) @@ -103,3 +104,46 @@ def test_is_printable_str(): assert is_printable_str("") is True # empty string assert is_printable_str(" ") is True # single space assert is_printable_str("\x7f") is False # DEL character + +def test_min_str_len(): + # Test invalid min_str_len values + with pytest.raises(ValueError): + list(extract_ascii_strings(b"test", min_str_len=0)) + with pytest.raises(ValueError): + list(extract_ascii_strings(b"test", min_str_len=-1)) + + # Test with ASCII strings + buf = b"a\x00ab\x00abc\x00abcd\x00abcde\x00" + + # Test with min_str_len=1 (minimum allowed) + strings = list(extract_ascii_strings(buf, min_str_len=1)) + assert len(strings) == 5 + assert [s.s for s in strings] == ["a", "ab", "abc", "abcd", "abcde"] + + # Test with min_str_len=3 + strings = list(extract_ascii_strings(buf, min_str_len=3)) + assert len(strings) == 3 + assert [s.s for s in strings] == ["abc", "abcd", "abcde"] + + # Test with min_str_len=5 + strings = list(extract_ascii_strings(buf, min_str_len=5)) + assert len(strings) == 1 + assert strings[0].s == "abcde" + + # Test Unicode strings + unicode_buf = ( + b"a\x00\x00\x00" # 'a' (len 1) + b"a\x00b\x00\x00\x00" # 'ab' (len 2) + b"a\x00b\x00c\x00\x00\x00" # 'abc' (len 3) + b"a\x00b\x00c\x00d\x00\x00\x00" # 'abcd' (len 4) + ) + + # Test with default min_str_len=4 for Unicode + strings = list(extract_unicode_strings(unicode_buf)) + assert len(strings) == 1 + assert strings[0].s == "abcd" + + # Test with min_str_len=2 for Unicode + strings = list(extract_unicode_strings(unicode_buf, min_str_len=2)) + assert len(strings) == 3 + assert [s.s for s in strings] == ["ab", "abc", "abcd"]