1
0
mirror of synced 2024-09-24 11:28:25 +02:00

Teach x86 emulator how to emulate a whole function, initialize memory to DLL/EXE contents, and teach psmap how to emulate multiple sections.

This commit is contained in:
Jennifer Taylor 2021-09-04 02:48:49 +00:00
parent b14c9cb90d
commit 4bc82bed61
2 changed files with 147 additions and 90 deletions

View File

@ -1,13 +1,14 @@
import pefile # type: ignore
import struct
import sys
from iced_x86 import Decoder, Formatter, FormatterSyntax, FormatMnemonicOptions
from iced_x86 import Decoder, Instruction, Formatter, FormatterSyntax, FormatMnemonicOptions
from typing import Any, List, Dict, Optional
class Memory:
def __init__(self) -> None:
self.values: Dict[int, int] = {}
self.defaults: Dict[int, bytes] = {}
def store(self, offset: int, data: bytes) -> None:
for i, b in enumerate(data):
@ -18,9 +19,17 @@ class Memory:
for i in range(offset, offset + length):
if i in self.values:
# Return modified value.
data.append(self.values[i])
else:
data.append(0)
# Attempt to return the default.
for virtual_start in self.defaults:
if offset >= virtual_start and offset < (virtual_start + len(self.defaults[virtual_start])):
data.append(self.defaults[virtual_start][offset - virtual_start])
break
else:
# Nothing here, return initialized RAM.
data.append(0)
return bytes(data)
@ -40,6 +49,16 @@ class Registers:
self.sf = False
class JumpException(Exception):
def __init__(self, address: int, message: str) -> None:
super().__init__(message)
self.address = address
class RetException(Exception):
pass
class PEFile:
def __init__(self, data: bytes) -> None:
self.data = data
@ -88,6 +107,76 @@ class PEFile:
# Assume this is virtual
end = self.virtual_to_physical(end)
registers = Registers(0xFFFFFFFFFFFFFFFF if self.is_64bit() else 0xFFFFFFFF)
memory = self.__to_memory()
decoder = Decoder(64 if self.is_64bit() else 32, self.data[start:end], ip=self.physical_to_virtual(start))
self.__emulate_chunk(registers, memory, [i for i in decoder], verbose)
# Replace memory that we care about.
self.__update(memory)
def emulate_function(self, start: int, verbose: bool = False) -> None:
if self.is_virtual(start):
# Assume this is virtual
start = self.virtual_to_physical(start)
registers = Registers(0xFFFFFFFFFFFFFFFF if self.is_64bit() else 0xFFFFFFFF)
memory = self.__to_memory()
# Need to fetch one at a time, emulating until we get a ret.
loc = start
end = len(self.data)
while True:
decoder = Decoder(64 if self.is_64bit() else 32, self.data[loc:end], ip=self.physical_to_virtual(loc))
chunk = [decoder.decode()]
try:
# First attempt to just run the instruction as normal.
self.__emulate_chunk(registers, memory, chunk, verbose)
loc = self.virtual_to_physical(chunk[0].next_ip)
except JumpException as jmp:
# We need to jump elsewhere.
loc = self.virtual_to_physical(jmp.address)
except RetException:
# We're done!
break
# Replace memory that we care about.
self.__update(memory)
def __to_memory(self) -> Memory:
memory = Memory()
for section in self.__pe.sections:
virtual = section.VirtualAddress + self.__pe.OPTIONAL_HEADER.ImageBase
length = section.SizeOfRawData
physical = self.virtual_to_physical(virtual)
memory.defaults[virtual] = self.data[physical:(physical + length)]
for virtual, physical in self.__adhoc_mapping.items():
memory.values[virtual] = self.data[physical]
return memory
def __update(self, memory: Memory) -> None:
newdata = [x for x in self.data]
for virtual in sorted(memory.values):
try:
physical = self.virtual_to_physical(virtual)
newdata[physical] = memory.values[virtual]
except Exception:
# This is outside of the data we are tracking. Its really not ideal
# that we are just shoving this at the end of the data, but it should
# work for what we care about.
physical = len(newdata)
self.__adhoc_mapping[virtual] = physical
newdata.append(memory.values[virtual])
self.data = bytes(newdata)
self.__pe = pefile.PE(data=self.data, fast_load=True)
def __emulate_chunk(self, registers: Registers, memory: Memory, chunk: List[Instruction], verbose: bool) -> None:
if verbose:
def vprint(*args: Any, **kwargs: Any) -> None:
print(*args, **kwargs, file=sys.stderr)
@ -95,16 +184,28 @@ class PEFile:
def vprint(*args: Any, **kwargs: Any) -> None:
pass
registers = Registers(0xFFFFFFFFFFFFFFFF if self.is_64bit() else 0xFFFFFFFF)
memory = Memory()
# Stuck here so that jump can bind to it.
loc: int = 0
def jump(destination: int) -> None:
nonlocal loc
for i in range(len(chunk)):
if chunk[i].ip == destination:
# Jump to this instruction.
loc = i
break
else:
if destination == chunk[-1].next_ip:
# Jump to the end, we're done.
loc = len(chunk)
else:
raise JumpException(destination, f"Jumping to {hex(destination)} which is outside of our evaluation range!")
formatter = Formatter(FormatterSyntax.NASM) # type: ignore
decoder = Decoder(64 if self.is_64bit() else 32, self.data[start:end], ip=self.physical_to_virtual(start))
insts = [i for i in decoder]
loc = 0
while loc < len(insts):
inst = insts[loc]
while loc < len(chunk):
inst = chunk[loc]
loc = loc + 1
mnemonic = formatter.format_mnemonic(inst, FormatMnemonicOptions.NO_PREFIXES) # type: ignore
@ -154,7 +255,7 @@ class PEFile:
const = None
vprint(f"imul {dest}, {mult}")
size = get_size(amt) or get_size(dest) or (get_size(const) if const is not None else None)
size = get_size(mult) or get_size(dest) or (get_size(const) if const is not None else None)
if size is None:
raise Exception(f"Could not determine size of {mnemonic} operation!")
if const is None:
@ -217,16 +318,7 @@ class PEFile:
destination = get_value(dest)
if destination is None:
raise Exception(f"Jumping to unsupported destination {dest}")
dest_off = self.virtual_to_physical(destination)
if dest_off == end:
loc = len(insts)
elif dest_off < start or dest_off > end:
raise Exception(f"Jumping to {hex(destination)} which is outside of our evaluation range!")
else:
decoder = Decoder(64 if self.is_64bit() else 32, self.data[dest_off:end], ip=self.physical_to_virtual(dest_off))
insts = [i for i in decoder]
loc = 0
jump(destination)
elif mnemonic == "je":
dest = formatter.format_operand(inst, 0)
@ -237,16 +329,7 @@ class PEFile:
destination = get_value(dest)
if destination is None:
raise Exception(f"Jumping to unsupported destination {dest}")
dest_off = self.virtual_to_physical(destination)
if dest_off == end:
loc = len(insts)
elif dest_off < start or dest_off > end:
raise Exception(f"Jumping to {hex(destination)} which is outside of our evaluation range!")
else:
decoder = Decoder(64 if self.is_64bit() else 32, self.data[dest_off:end], ip=self.physical_to_virtual(dest_off))
insts = [i for i in decoder]
loc = 0
jump(destination)
elif mnemonic == "jns":
dest = formatter.format_operand(inst, 0)
@ -257,16 +340,7 @@ class PEFile:
destination = get_value(dest)
if destination is None:
raise Exception(f"Jumping to unsupported destination {dest}")
dest_off = self.virtual_to_physical(destination)
if dest_off == end:
loc = len(insts)
elif dest_off < start or dest_off > end:
raise Exception(f"Jumping to {hex(destination)} which is outside of our evaluation range!")
else:
decoder = Decoder(64 if self.is_64bit() else 32, self.data[dest_off:end], ip=self.physical_to_virtual(dest_off))
insts = [i for i in decoder]
loc = 0
jump(destination)
elif mnemonic == "js":
dest = formatter.format_operand(inst, 0)
@ -277,16 +351,7 @@ class PEFile:
destination = get_value(dest)
if destination is None:
raise Exception(f"Jumping to unsupported destination {dest}")
dest_off = self.virtual_to_physical(destination)
if dest_off == end:
loc = len(insts)
elif dest_off < start or dest_off > end:
raise Exception(f"Jumping to {hex(destination)} which is outside of our evaluation range!")
else:
decoder = Decoder(64 if self.is_64bit() else 32, self.data[dest_off:end], ip=self.physical_to_virtual(dest_off))
insts = [i for i in decoder]
loc = 0
jump(destination)
elif mnemonic == "jmp":
dest = formatter.format_operand(inst, 0)
@ -296,16 +361,7 @@ class PEFile:
destination = get_value(dest)
if destination is None:
raise Exception(f"Jumping to unsupported destination {dest}")
dest_off = self.virtual_to_physical(destination)
if dest_off == end:
loc = len(insts)
elif dest_off < start or dest_off > end:
raise Exception(f"Jumping to {hex(destination)} which is outside of our evaluation range!")
else:
decoder = Decoder(64 if self.is_64bit() else 32, self.data[dest_off:end], ip=self.physical_to_virtual(dest_off))
insts = [i for i in decoder]
loc = 0
jump(destination)
elif mnemonic == "and":
dest = formatter.format_operand(inst, 0)
@ -357,26 +413,14 @@ class PEFile:
raise Exception(f"Could not compute effective address for {mnemonic} operation!")
assign(registers, memory, size, dest, result)
elif mnemonic == "ret":
vprint("ret")
raise RetException("Encountered {mnemonic} instruction but we aren't in function context!")
else:
raise Exception(f"Unsupported mnemonic {mnemonic}!")
# Replace memory that we care about.
newdata = [x for x in self.data]
for virtual in sorted(memory.values):
try:
physical = self.virtual_to_physical(virtual)
newdata[physical] = memory.values[virtual]
except Exception:
# This is outside of the data we are tracking. Its really not ideal
# that we are just shoving this at the end of the data, but it should
# work for what we care about.
physical = len(newdata)
self.__adhoc_mapping[virtual] = physical
newdata.append(memory.values[virtual])
self.data = bytes(newdata)
self.__pe = pefile.PE(data=self.data, fast_load=True)
def sanitize(indirect: str) -> str:
"""

View File

@ -237,22 +237,28 @@ def main() -> None:
default="root",
)
parser.add_argument(
"--emulate-start",
"--emulate-code",
help=(
"Hex offset where we should start emulating x86/x64 code to reconstuct a dynamic psmap structure. "
"This can be specified as either a raw offset into the DLL or as a virtual offset."
"Hex offset pair of addresses where we should emulate x86/x64 code to "
"reconstuct a dynamic psmap structure, separated by a colon. This can "
"be specified as either a raw offset into the DLL or as a virtual offset. "
"If multiple sections must be emulated you can specify this multiple times."
),
type=str,
default=None,
action='append',
default=[],
)
parser.add_argument(
"--emulate-end",
"--emulate-function",
help=(
"Hex offset where we should finish emulating x86/x64 code to reconstuct a dynamic psmap structure. "
"This can be specified as either a raw offset into the DLL or as a virtual offset."
"Hex offset address of a function that we should emulate to reconstruct a "
"dynamic psmap structure. This can be specified as either a raw offset into "
"the DLL or as a virtual offset. If multiple functions must be emulated you "
"can specify this multiple times."
),
type=str,
default=None,
action='append',
default=[],
)
parser.add_argument(
"--verbose",
@ -269,10 +275,17 @@ def main() -> None:
pe = PEFile(data=data)
# If asked, attempt to emulate code which dynamically constructs a psmap structure.
if args.emulate_start and args.emulate_end:
start = int(args.emulate_start, 16)
end = int(args.emulate_end, 16)
pe.emulate_code(start, end, verbose=args.verbose)
if args.emulate_code:
for chunk in args.emulate_code:
emulate_start, emulate_end = chunk.split(':', 1)
start = int(emulate_start, 16)
end = int(emulate_end, 16)
pe.emulate_code(start, end, verbose=args.verbose)
if args.emulate_function:
for function_address in args.emulate_function:
fun = int(function_address, 16)
pe.emulate_function(fun, verbose=args.verbose)
layout = parse_psmap(pe, args.offset, args.root, verbose=args.verbose)