Skip to content

Makes string length configurable and consistent across backends. Closes #1303 #2678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
f26d321
added default string length and changed n to min_len in strings.py
Shajal-Kumar Mar 18, 2025
59ef086
added min_str_len argument to common.py
Shajal-Kumar Mar 18, 2025
0649227
added ctx to elffile.py
Shajal-Kumar Mar 19, 2025
68d59cd
added ctx to pefile.py and made minor changes to elffile.py
Shajal-Kumar Mar 19, 2025
bb8ad59
fixed argument related issues and added ctx to pefile.py
Shajal-Kumar Mar 19, 2025
fa234a6
added minimum string length functionality to loader.py and main.py. A…
Shajal-Kumar Mar 20, 2025
eaa62c6
added minimum string length functionality to the Viv Extractor
Shajal-Kumar Mar 21, 2025
bb19023
Added min_str_len and context dictionary to dnfile
Shajal-Kumar Mar 21, 2025
a3071fd
Added min_str_len and context dictionary to ida.
Shajal-Kumar Mar 21, 2025
ddbbacd
Updated tests/fixtures.py to incorporate DEFAULT_STRING_LENGTH
Shajal-Kumar Mar 21, 2025
b19cb70
Fixed dotnetfile.py, fixed issues with vmrayextractor, added ctx base…
Shajal-Kumar Mar 24, 2025
df5aa67
Fixed dnfile to check if the parameter being passed is a dictionary o…
Shajal-Kumar Mar 25, 2025
666501e
Fixed failing tests.
Shajal-Kumar Mar 26, 2025
954f7a2
Passes all tests. Fixes linting issues.
Shajal-Kumar Mar 27, 2025
d64d4f7
Merge branch 'mandiant:master' into master
Shajal-Kumar Mar 27, 2025
0d49215
Adds test to validate the changes related to the issue to test_string…
Shajal-Kumar Mar 27, 2025
4129c09
Updates changelog.
Shajal-Kumar Mar 27, 2025
265e6c8
Removes large files from PR.
Shajal-Kumar Mar 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master (unreleased)

- Added configurable string length (`min_str_len`) across backends (#1303)

### New Features

### Breaking Changes
Expand Down
9 changes: 6 additions & 3 deletions capa/features/extractors/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
)
from capa.features.freeze import is_freeze
from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress
from capa.features.extractors.strings import DEFAULT_STRING_LENGTH

logger = logging.getLogger(__name__)

Expand All @@ -52,14 +53,16 @@
MATCH_JSON_OBJECT = b'{"'


def extract_file_strings(buf: bytes, **kwargs) -> Iterator[tuple[String, Address]]:
def extract_file_strings(
buf: bytes, min_str_len: int = DEFAULT_STRING_LENGTH, **kwargs
) -> Iterator[tuple[String, Address]]:
"""
extract ASCII and UTF-16 LE strings from file
"""
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
for s in capa.features.extractors.strings.extract_ascii_strings(buf, min_str_len=min_str_len):
yield String(s.s), FileOffsetAddress(s.offset)

for s in capa.features.extractors.strings.extract_unicode_strings(buf):
for s in capa.features.extractors.strings.extract_unicode_strings(buf, min_str_len=min_str_len):
yield String(s.s), FileOffsetAddress(s.offset)


Expand Down
22 changes: 16 additions & 6 deletions capa/features/extractors/dnfile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import capa.features.extractors.dnfile.function
from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address, DNTokenAddress, DNTokenOffsetAddress
from capa.features.extractors.strings import DEFAULT_STRING_LENGTH
from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod
from capa.features.extractors.base_extractor import (
BBHandle,
Expand Down Expand Up @@ -82,8 +83,9 @@ def get_type(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]:


class DnfileFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: Path):
def __init__(self, path: Path, min_str_len: int = DEFAULT_STRING_LENGTH):
self.pe: dnfile.dnPE = dnfile.dnPE(str(path))
self.min_str_len = min_str_len
super().__init__(hashes=SampleHashes.from_bytes(path.read_bytes()))

# pre-compute .NET token lookup tables; each .NET method has access to this cache for feature extraction
Expand All @@ -92,9 +94,9 @@ def __init__(self, path: Path):

# pre-compute these because we'll yield them at *every* scope.
self.global_features: list[tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_format())
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_os(pe=self.pe))
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_arch(pe=self.pe))
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_format(self.pe))
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_os(self.pe))
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_arch(self.pe))

def get_base_address(self):
return NO_ADDRESS
Expand All @@ -103,7 +105,9 @@ def extract_global_features(self):
yield from self.global_features

def extract_file_features(self):
yield from capa.features.extractors.dnfile.file.extract_features(self.pe)
yield from capa.features.extractors.dnfile.file.extract_features(
ctx={"pe": self.pe, "min_str_len": self.min_str_len}
)

def get_functions(self) -> Iterator[FunctionHandle]:
# create a method lookup table
Expand All @@ -112,7 +116,13 @@ def get_functions(self) -> Iterator[FunctionHandle]:
fh: FunctionHandle = FunctionHandle(
address=DNTokenAddress(token),
inner=method,
ctx={"pe": self.pe, "calls_from": set(), "calls_to": set(), "cache": self.token_cache},
ctx={
"pe": self.pe,
"calls_from": set(),
"calls_to": set(),
"cache": self.token_cache,
"min_str_len": self.min_str_len,
},
)

# method tokens should be unique
Expand Down
38 changes: 22 additions & 16 deletions capa/features/extractors/dnfile/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,43 @@
from capa.features.address import Address


def extract_file_import_names(pe: dnfile.dnPE) -> Iterator[tuple[Import, Address]]:
yield from capa.features.extractors.dotnetfile.extract_file_import_names(pe=pe)
def extract_file_import_names(ctx) -> Iterator[tuple[Import, Address]]:
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
yield from capa.features.extractors.dotnetfile.extract_file_import_names(pe)


def extract_file_format(pe: dnfile.dnPE) -> Iterator[tuple[Format, Address]]:
yield from capa.features.extractors.dotnetfile.extract_file_format(pe=pe)
def extract_file_format(ctx) -> Iterator[tuple[Format, Address]]:
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
yield from capa.features.extractors.dotnetfile.extract_file_format(pe)


def extract_file_function_names(pe: dnfile.dnPE) -> Iterator[tuple[FunctionName, Address]]:
yield from capa.features.extractors.dotnetfile.extract_file_function_names(pe=pe)
def extract_file_function_names(ctx) -> Iterator[tuple[FunctionName, Address]]:
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
yield from capa.features.extractors.dotnetfile.extract_file_function_names(pe)


def extract_file_strings(pe: dnfile.dnPE) -> Iterator[tuple[String, Address]]:
yield from capa.features.extractors.dotnetfile.extract_file_strings(pe=pe)
def extract_file_strings(ctx) -> Iterator[tuple[String, Address]]:
yield from capa.features.extractors.dotnetfile.extract_file_strings(ctx)


def extract_file_mixed_mode_characteristic_features(pe: dnfile.dnPE) -> Iterator[tuple[Characteristic, Address]]:
yield from capa.features.extractors.dotnetfile.extract_file_mixed_mode_characteristic_features(pe=pe)
def extract_file_mixed_mode_characteristic_features(ctx) -> Iterator[tuple[Characteristic, Address]]:
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
yield from capa.features.extractors.dotnetfile.extract_file_mixed_mode_characteristic_features(pe)


def extract_file_namespace_features(pe: dnfile.dnPE) -> Iterator[tuple[Namespace, Address]]:
yield from capa.features.extractors.dotnetfile.extract_file_namespace_features(pe=pe)
def extract_file_namespace_features(ctx) -> Iterator[tuple[Namespace, Address]]:
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
yield from capa.features.extractors.dotnetfile.extract_file_namespace_features(pe)


def extract_file_class_features(pe: dnfile.dnPE) -> Iterator[tuple[Class, Address]]:
yield from capa.features.extractors.dotnetfile.extract_file_class_features(pe=pe)
def extract_file_class_features(ctx) -> Iterator[tuple[Class, Address]]:
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
yield from capa.features.extractors.dotnetfile.extract_file_class_features(pe)


def extract_features(pe: dnfile.dnPE) -> Iterator[tuple[Feature, Address]]:
def extract_features(ctx) -> Iterator[tuple[Feature, Address]]:
for file_handler in FILE_HANDLERS:
for feature, address in file_handler(pe):
for feature, address in file_handler(ctx={"pe": ctx["pe"], "min_str_len": ctx["min_str_len"]}):
yield feature, address


Expand Down
2 changes: 1 addition & 1 deletion capa/features/extractors/dnfile/insn.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def extract_insn_string_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iter
if user_string is None:
return

if len(user_string) >= 4:
if len(user_string) >= fh.ctx["min_str_len"]:
yield String(user_string), ih.address


Expand Down
44 changes: 26 additions & 18 deletions capa/features/extractors/dotnetfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
Characteristic,
)
from capa.features.address import NO_ADDRESS, Address, DNTokenAddress
from capa.features.extractors.strings import DEFAULT_STRING_LENGTH
from capa.features.extractors.dnfile.types import DnType
from capa.features.extractors.base_extractor import SampleHashes, StaticFeatureExtractor
from capa.features.extractors.dnfile.helpers import (
Expand All @@ -55,12 +56,13 @@
logger = logging.getLogger(__name__)


def extract_file_format(**kwargs) -> Iterator[tuple[Format, Address]]:
def extract_file_format(ctx) -> Iterator[tuple[Format, Address]]:
yield Format(FORMAT_DOTNET), NO_ADDRESS
yield Format(FORMAT_PE), NO_ADDRESS


def extract_file_import_names(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Import, Address]]:
def extract_file_import_names(ctx) -> Iterator[tuple[Import, Address]]:
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
for method in get_dotnet_managed_imports(pe):
# like System.IO.File::OpenRead
yield Import(str(method)), DNTokenAddress(method.token)
Expand All @@ -71,16 +73,18 @@ def extract_file_import_names(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Impor
yield Import(name), DNTokenAddress(imp.token)


def extract_file_function_names(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[FunctionName, Address]]:
def extract_file_function_names(ctx) -> Iterator[tuple[FunctionName, Address]]:
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
for method in get_dotnet_managed_methods(pe):
yield FunctionName(str(method)), DNTokenAddress(method.token)


def extract_file_namespace_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Namespace, Address]]:
def extract_file_namespace_features(ctx) -> Iterator[tuple[Namespace, Address]]:
"""emit namespace features from TypeRef and TypeDef tables"""

# namespaces may be referenced multiple times, so we need to filter
namespaces = set()
pe = ctx["pe"] if isinstance(ctx, dict) else ctx

for _, typedef in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
# emit internal .NET namespaces
Expand All @@ -100,8 +104,9 @@ def extract_file_namespace_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple
yield Namespace(namespace), NO_ADDRESS


def extract_file_class_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Class, Address]]:
def extract_file_class_features(ctx) -> Iterator[tuple[Class, Address]]:
"""emit class features from TypeRef and TypeDef tables"""
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
nested_class_table = get_dotnet_nested_class_table_index(pe)

for rid, typedef in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
Expand All @@ -123,13 +128,14 @@ def extract_file_class_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Cla
yield Class(DnType.format_name(typerefname, namespace=typerefnamespace)), DNTokenAddress(token)


def extract_file_os(**kwargs) -> Iterator[tuple[OS, Address]]:
def extract_file_os(ctx) -> Iterator[tuple[OS, Address]]:
yield OS(OS_ANY), NO_ADDRESS


def extract_file_arch(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Arch, Address]]:
def extract_file_arch(ctx) -> Iterator[tuple[Arch, Address]]:
# to distinguish in more detail, see https://stackoverflow.com/a/23614024/10548020
# .NET 4.5 added option: any CPU, 32-bit preferred
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
assert pe.net is not None
assert pe.net.Flags is not None

Expand All @@ -141,20 +147,21 @@ def extract_file_arch(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Arch, Address
yield Arch(ARCH_ANY), NO_ADDRESS


def extract_file_strings(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[String, Address]]:
yield from capa.features.extractors.common.extract_file_strings(pe.__data__)
def extract_file_strings(ctx) -> Iterator[tuple[String, Address]]:
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
min_str_len = ctx.get("min_str_len", DEFAULT_STRING_LENGTH) if isinstance(ctx, dict) else DEFAULT_STRING_LENGTH
yield from capa.features.extractors.common.extract_file_strings(pe.__data__, min_str_len)


def extract_file_mixed_mode_characteristic_features(
pe: dnfile.dnPE, **kwargs
) -> Iterator[tuple[Characteristic, Address]]:
def extract_file_mixed_mode_characteristic_features(ctx) -> Iterator[tuple[Characteristic, Address]]:
pe = ctx["pe"] if isinstance(ctx, dict) else ctx
if is_dotnet_mixed_mode(pe):
yield Characteristic("mixed mode"), NO_ADDRESS


def extract_file_features(pe: dnfile.dnPE) -> Iterator[tuple[Feature, Address]]:
def extract_file_features(ctx) -> Iterator[tuple[Feature, Address]]:
for file_handler in FILE_HANDLERS:
for feature, addr in file_handler(pe=pe): # type: ignore
for feature, addr in file_handler(ctx): # type: ignore
yield feature, addr


Expand All @@ -169,9 +176,9 @@ def extract_file_features(pe: dnfile.dnPE) -> Iterator[tuple[Feature, Address]]:
)


def extract_global_features(pe: dnfile.dnPE) -> Iterator[tuple[Feature, Address]]:
def extract_global_features(ctx) -> Iterator[tuple[Feature, Address]]:
for handler in GLOBAL_HANDLERS:
for feature, va in handler(pe=pe): # type: ignore
for feature, va in handler(ctx): # type: ignore
yield feature, va


Expand All @@ -182,10 +189,11 @@ def extract_global_features(pe: dnfile.dnPE) -> Iterator[tuple[Feature, Address]


class DotnetFileFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: Path):
def __init__(self, path: Path, min_str_len: int = DEFAULT_STRING_LENGTH):
super().__init__(hashes=SampleHashes.from_bytes(path.read_bytes()))
self.path: Path = path
self.pe: dnfile.dnPE = dnfile.dnPE(str(path))
self.min_str_len = min_str_len

def get_base_address(self):
return NO_ADDRESS
Expand All @@ -203,7 +211,7 @@ def extract_global_features(self):
yield from extract_global_features(self.pe)

def extract_file_features(self):
yield from extract_file_features(self.pe)
yield from extract_file_features(ctx={"pe": self.pe, "min_str_len": DEFAULT_STRING_LENGTH})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

It seems like DEFAULT_STRING_LENGTH is used here, but the DotnetFileFeatureExtractor is initialized with min_str_len (which is stored as self.min_str_len).

Should this be using self.min_str_len to ensure the configured minimum string length is respected for file feature extraction in this extractor, similar to how it's handled in other extractors like PefileFeatureExtractor or ElfFeatureExtractor?

Suggested change
yield from extract_file_features(ctx={"pe": self.pe, "min_str_len": DEFAULT_STRING_LENGTH})
yield from extract_file_features(ctx={"pe": self.pe, "min_str_len": self.min_str_len})


def is_dotnet_file(self) -> bool:
return bool(self.pe.net)
Expand Down
26 changes: 17 additions & 9 deletions capa/features/extractors/elffile.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
from capa.features.file import Export, Import, Section
from capa.features.common import OS, FORMAT_ELF, Arch, Format, Feature
from capa.features.address import NO_ADDRESS, FileOffsetAddress, AbsoluteVirtualAddress
from capa.features.extractors.strings import DEFAULT_STRING_LENGTH
from capa.features.extractors.base_extractor import SampleHashes, StaticFeatureExtractor

logger = logging.getLogger(__name__)


def extract_file_export_names(elf: ELFFile, **kwargs):
def extract_file_export_names(ctx):
elf = ctx["elf"] if isinstance(ctx, dict) else ctx

for section in elf.iter_sections():
if not isinstance(section, SymbolTableSection):
continue
Expand Down Expand Up @@ -79,7 +82,8 @@ def extract_file_export_names(elf: ELFFile, **kwargs):
yield Export(symbol.name), AbsoluteVirtualAddress(symbol.entry.st_value)


def extract_file_import_names(elf: ELFFile, **kwargs):
def extract_file_import_names(ctx):
elf = ctx["elf"] if isinstance(ctx, dict) else ctx
symbol_name_by_index: dict[int, str] = {}

# Extract symbol names and store them in the dictionary
Expand Down Expand Up @@ -139,16 +143,19 @@ def extract_file_import_names(elf: ELFFile, **kwargs):
yield Import(symbol_name), FileOffsetAddress(symbol_address)


def extract_file_section_names(elf: ELFFile, **kwargs):
def extract_file_section_names(ctx):
elf = ctx["elf"] if isinstance(ctx, dict) else ctx

for section in elf.iter_sections():
if section.name:
yield Section(section.name), AbsoluteVirtualAddress(section.header.sh_addr)
elif section.is_null():
yield Section("NULL"), AbsoluteVirtualAddress(section.header.sh_addr)


def extract_file_strings(buf, **kwargs):
yield from capa.features.extractors.common.extract_file_strings(buf)
def extract_file_strings(ctx):

yield from capa.features.extractors.common.extract_file_strings(ctx["buf"], ctx["min_str_len"])


def extract_file_os(elf: ELFFile, buf, **kwargs):
Expand Down Expand Up @@ -179,9 +186,9 @@ def extract_file_arch(elf: ELFFile, **kwargs):
logger.warning("unsupported architecture: %s", arch)


def extract_file_features(elf: ELFFile, buf: bytes) -> Iterator[tuple[Feature, int]]:
def extract_file_features(ctx) -> Iterator[tuple[Feature, int]]:
for file_handler in FILE_HANDLERS:
for feature, addr in file_handler(elf=elf, buf=buf): # type: ignore
for feature, addr in file_handler(ctx=ctx): # type: ignore
yield feature, addr


Expand All @@ -208,9 +215,10 @@ def extract_global_features(elf: ELFFile, buf: bytes) -> Iterator[tuple[Feature,


class ElfFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: Path):
def __init__(self, path: Path, min_str_len: int = DEFAULT_STRING_LENGTH):
super().__init__(SampleHashes.from_bytes(path.read_bytes()))
self.path: Path = path
self.min_str_len = min_str_len
self.elf = ELFFile(io.BytesIO(path.read_bytes()))

def get_base_address(self):
Expand All @@ -228,7 +236,7 @@ def extract_global_features(self):
def extract_file_features(self):
buf = self.path.read_bytes()

for feature, addr in extract_file_features(self.elf, buf):
for feature, addr in extract_file_features(ctx={"elf": self.elf, "buf": buf, "min_str_len": self.min_str_len}):
yield feature, addr

def get_functions(self):
Expand Down
Loading
Loading