Skip to content

Reformatting JSON to use context based on PE files #1109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: quantumstrand
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 148 additions & 5 deletions floss/render/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import json
import datetime
import dataclasses
import pefile
import logging

from floss.results import ResultDocument

Expand All @@ -23,9 +25,150 @@ def default(self, o):
return super().default(o)


def create_custom_format(doc: ResultDocument):
"""
Format the ResultDocument into a custom JSON structure with proper PE parsing.
"""
static_strings = []
stack_strings = []
tight_strings = []
decoded_strings = []
context = {}


pe = None
try:
pe = pefile.PE(doc.metadata.file_path)
except Exception as e:
logging.warning(f"Failed to parse PE file: {e}")
Comment on lines +42 to +43

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider logging the specific exception that occurred, as well as the file path, to aid in debugging. This could be achieved by including the exception message in the log and the filename.

Suggested change
except Exception as e:
logging.warning(f"Failed to parse PE file: {e}")
logging.warning(f"Failed to parse PE file: {doc.metadata.file_path} - {e}")


pe_sections = {}
import_entries = set()
import_dll_map = {}

if pe:

pe_headers = [(0, pe.DOS_HEADER.e_lfanew + 4, "DOS_HEADER")]
if hasattr(pe, 'NT_HEADERS'):
pe_headers.append((pe.DOS_HEADER.e_lfanew,
pe.DOS_HEADER.e_lfanew + pe.NT_HEADERS.sizeof(),
"NT_HEADERS"))

for section in pe.sections:
start = section.PointerToRawData
end = start + section.SizeOfRawData
name = section.Name.decode('utf-8', 'ignore').strip('\x00')
pe_sections[(start, end)] = name

section_name = name
if not section_name.startswith('.'):
section_name = f".{section_name}"


if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dll_name = entry.dll.decode('utf-8', 'ignore') if entry.dll else "unknown"

for imp in entry.imports:
if imp.name:
func_name = imp.name.decode('utf-8', 'ignore')
import_entries.add(func_name)
import_dll_map[func_name] = dll_name

for string in doc.strings.static_strings:
offset = string.offset
static_strings.append({
"encoding": string.encoding.value.lower(),
"offset": offset,
"length": len(string.string),
"string": string.string
})


if pe:
for i, (start, end, htype) in enumerate(pe_headers):
if start <= offset < end:
context[offset] = {
"structure": "pe.header",
"header_type": htype,
"tags": ["#common"]
}
break

for (start, end), name in pe_sections.items():
if start <= offset < end:
if name.startswith('.'):
structure_name = f"section{name}"
else:
structure_name = f"section.{name}"

section_name = name if name.startswith('.') else f".{name}"
context[offset] = {
"structure": structure_name,
"parent_structure": "pe",
"tags": ["#section"]
}

if string.string in import_entries:
dll_name = import_dll_map.get(string.string, "unknown")
context[offset] = {
"structure": "import table",
"parent_structure": structure_name,
"dll": dll_name,
"tags": ["#winapi", "#common"]
}
break

for string in doc.strings.stack_strings:
stack_entry = {
"encoding": string.encoding.value.lower(),
"offset": string.program_counter,
"length": len(string.string),
"string": string.string,
"function": string.function,
"stack_pointer": string.stack_pointer,
"frame_offset": string.frame_offset
}
stack_strings.append(stack_entry)

for string in doc.strings.tight_strings:
tight_entry = {
"encoding": string.encoding.value.lower(),
"offset": string.program_counter,
"length": len(string.string),
"string": string.string,
"function": string.function,
"stack_pointer": string.stack_pointer,
"frame_offset": string.frame_offset
}
tight_strings.append(tight_entry)

for string in doc.strings.decoded_strings:
decoded_strings.append({
"encoding": string.encoding.value.lower(),
"offset": string.decoding_routine,
"length": len(string.string),
"string": string.string,
"decoded_at": string.decoded_at,
"address": string.address,
"address_type": string.address_type
})
return {
"strings": {
"static_strings": static_strings,
"stack_strings": stack_strings,
"tight_strings": tight_strings,
"decoded_strings": decoded_strings,
"context": context
},
"file_layout": {
"file_path": doc.metadata.file_path,
"imagebase": doc.metadata.imagebase,
"min_length": doc.metadata.min_length
},
}


def render(doc: ResultDocument) -> str:
return json.dumps(
doc,
cls=FlossJSONEncoder,
sort_keys=True,
)
custom_format = create_custom_format(doc)
return json.dumps(custom_format, indent=4)