1
0
mirror of synced 2024-11-12 01:00:46 +01:00

Implement loop break/continue/goto processing.

This commit is contained in:
Jennifer Taylor 2021-04-24 18:01:25 +00:00
parent 738fce36c9
commit 6e34d2647e
2 changed files with 245 additions and 37 deletions

View File

@ -1,14 +1,14 @@
import os import os
from typing import Any, Dict, List, Tuple, Set, Union, Optional, cast from typing import Any, Dict, List, Sequence, Tuple, Set, Union, Optional, cast
from .types import AP2Action, JumpAction, IfAction from .types import AP2Action, JumpAction, IfAction, DefineFunction2Action
from .util import VerboseOutput from .util import VerboseOutput
class ByteCode: class ByteCode:
# A list of bytecodes to execute. # A list of bytecodes to execute.
def __init__(self, actions: List[AP2Action], end_offset: int) -> None: def __init__(self, actions: Sequence[AP2Action], end_offset: int) -> None:
self.actions = actions self.actions = list(actions)
self.end_offset = end_offset self.end_offset = end_offset
def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]:
@ -72,23 +72,82 @@ class ControlFlow:
return f"ControlFlow(beginning={self.beginning}, end={self.end}, next={(', '.join(str(n) for n in self.next_flow)) or 'N/A'}" return f"ControlFlow(beginning={self.beginning}, end={self.end}, next={(', '.join(str(n) for n in self.next_flow)) or 'N/A'}"
class ConvertedAction:
# An action that has been analyzed and converted to an intermediate representation.
pass
ArbitraryOpcode = Union[AP2Action, ConvertedAction]
class BreakStatement(ConvertedAction):
# A break from a loop (forces execution to the next line after the loop).
def __repr__(self) -> str:
return "break;"
class ContinueStatement(ConvertedAction):
# A continue in a loop (forces execution to the top of the loop).
def __repr__(self) -> str:
return "continue;"
class GotoStatement(ConvertedAction):
# A goto, including the ID of the chunk we want to jump to.
def __init__(self, location: int) -> None:
self.location = location
def __repr__(self) -> str:
return f"goto label_{self.location};"
class IntermediateIfStatement(ConvertedAction):
def __init__(self, parent_action: IfAction, true_actions: Sequence[ArbitraryOpcode], false_actions: Sequence[ArbitraryOpcode], negate: bool) -> None:
self.parent_action = parent_action
self.true_actions = list(true_actions)
self.false_actions = list(false_actions)
self.negate = negate
def __repr__(self) -> str:
true_entries: List[str] = []
for action in self.true_actions:
true_entries.extend([f" {s}" for s in str(action).split(os.linesep)])
false_entries: List[str] = []
for action in self.false_actions:
false_entries.extend([f" {s}" for s in str(action).split(os.linesep)])
if self.false_actions:
return os.linesep.join([
f"if <{'!' if self.negate else ''}{self.parent_action}> {{",
os.linesep.join(true_entries),
"} else {",
os.linesep.join(false_entries),
"}"
])
else:
return os.linesep.join([
f"if <{'!' if self.negate else ''}{self.parent_action}> {{",
os.linesep.join(true_entries),
"}"
])
class ByteCodeChunk: class ByteCodeChunk:
def __init__(self, id: int, actions: List[AP2Action], next_chunks: List[int], previous_chunks: List[int] = []) -> None: def __init__(self, id: int, actions: Sequence[ArbitraryOpcode], next_chunks: List[int], previous_chunks: List[int] = []) -> None:
self.id = id self.id = id
self.actions = actions self.actions = list(actions)
self.next_chunks = next_chunks self.next_chunks = next_chunks
self.previous_chunks = previous_chunks or [] self.previous_chunks = previous_chunks or []
@property
def offset(self) -> Optional[int]:
if self.actions:
return self.actions[0].offset
return None
def __repr__(self) -> str: def __repr__(self) -> str:
entries: List[str] = [] entries: List[str] = []
for action in self.actions: for action in self.actions:
entries.extend([f" {s}" for s in str(action).split(os.linesep)]) if isinstance(action, DefineFunction2Action):
# Special case, since we will decompile this later, we don't want to print it now.
entries.append(f" {action.offset}: {AP2Action.action_to_name(action.opcode)}, Name: {action.name or '<anonymous function>'}, Flags: {hex(action.flags)}")
else:
entries.extend([f" {s}" for s in str(action).split(os.linesep)])
return ( return (
f"ByteCodeChunk({os.linesep}" + f"ByteCodeChunk({os.linesep}" +
@ -104,7 +163,7 @@ ArbitraryCodeChunk = Union[ByteCodeChunk, "Loop"]
class Loop: class Loop:
def __init__(self, id: int, chunks: List[ArbitraryCodeChunk]) -> None: def __init__(self, id: int, chunks: Sequence[ArbitraryCodeChunk]) -> None:
# The ID is usually the chunk that other chunks point into. # The ID is usually the chunk that other chunks point into.
self.id = id self.id = id
@ -113,7 +172,7 @@ class Loop:
self.previous_chunks: List[int] = [] self.previous_chunks: List[int] = []
self.next_chunks: List[int] = [] self.next_chunks: List[int] = []
self.chunks = chunks self.chunks = list(chunks)
for chunk in chunks: for chunk in chunks:
for nextid in chunk.next_chunks: for nextid in chunk.next_chunks:
@ -123,14 +182,6 @@ class Loop:
if previd not in ided_chunks: if previd not in ided_chunks:
self.previous_chunks.append(previd) self.previous_chunks.append(previd)
@property
def offset(self) -> Optional[int]:
for chunk in self.chunks:
if chunk.id == self.id:
return chunk.offset
# We're guaranteed to have a haeder (the ID), so this is a problem.
raise Exception("Logic error!")
def __repr__(self) -> str: def __repr__(self) -> str:
entries: List[str] = [] entries: List[str] = []
for chunk in self.chunks: for chunk in self.chunks:
@ -204,7 +255,7 @@ class ByteCodeDecompiler(VerboseOutput):
self.bytecode = bytecode self.bytecode = bytecode
def __graph_control_flow(self) -> List[ByteCodeChunk]: def __graph_control_flow(self) -> Tuple[List[ByteCodeChunk], Dict[int, int]]:
# Start by assuming that the whole bytecode never directs flow. This is, confusingly, # Start by assuming that the whole bytecode never directs flow. This is, confusingly,
# indexed by AP2Action offset, not by actual bytecode offset, so we can avoid the # indexed by AP2Action offset, not by actual bytecode offset, so we can avoid the
# prickly problem of opcodes that take more than one byte in the data. # prickly problem of opcodes that take more than one byte in the data.
@ -305,7 +356,7 @@ class ByteCodeDecompiler(VerboseOutput):
flows[current_action_flow].next_flow = [dest_action_flow] flows[current_action_flow].next_flow = [dest_action_flow]
self.vprint(f"{action} action repointed {flows[current_action_flow]} to new chunk") self.vprint(f"{action} action repointed {flows[current_action_flow]} to new chunk")
elif action.opcode in [AP2Action.IF, AP2Action.IF2]: elif action.opcode == AP2Action.IF:
# Conditional control flow redirection after this, we should split the # Conditional control flow redirection after this, we should split the
# section if necessary and point this section at the new offset as well # section if necessary and point this section at the new offset as well
# as the second half of the split section. # as the second half of the split section.
@ -356,6 +407,9 @@ class ByteCodeDecompiler(VerboseOutput):
flows[current_action_flow].next_flow = [next_action, dest_action_flow] flows[current_action_flow].next_flow = [next_action, dest_action_flow]
self.vprint(f"{action} action repointed {flows[current_action_flow]} to new chunk") self.vprint(f"{action} action repointed {flows[current_action_flow]} to new chunk")
elif action.opcode == AP2Action.IF2:
# We don't emit this anymore, so this is a problem.
raise Exception("Logic error!")
# Finally, return chunks of contiguous execution. # Finally, return chunks of contiguous execution.
chunks: List[ByteCodeChunk] = [] chunks: List[ByteCodeChunk] = []
@ -383,12 +437,16 @@ class ByteCodeDecompiler(VerboseOutput):
entries: Dict[int, List[int]] = {} entries: Dict[int, List[int]] = {}
offset_to_id: Dict[int, int] = {} offset_to_id: Dict[int, int] = {}
for chunk in chunks: for chunk in chunks:
offset_to_id[chunk.offset] = chunk.id # We haven't emitted any non-AP2Actions yet, so we are safe in casting here.
chunk_offset = cast(AP2Action, chunk.actions[0]).offset
offset_to_id[chunk_offset] = chunk.id
for next_chunk in chunk.next_chunks: for next_chunk in chunk.next_chunks:
entries[next_chunk] = entries.get(next_chunk, []) + [chunk.offset] entries[next_chunk] = entries.get(next_chunk, []) + [chunk_offset]
for chunk in chunks: for chunk in chunks:
chunk.previous_chunks = entries.get(chunk.offset, []) # We haven't emitted any non-AP2Actions yet, so we are safe in casting here.
chunk_offset = cast(AP2Action, chunk.actions[0]).offset
chunk.previous_chunks = entries.get(chunk_offset, [])
# Now, convert the offsets to chunk ID pointers. # Now, convert the offsets to chunk ID pointers.
end_previous_chunks: List[int] = [] end_previous_chunks: List[int] = []
@ -402,10 +460,13 @@ class ByteCodeDecompiler(VerboseOutput):
end_previous_chunks.append(chunk.id) end_previous_chunks.append(chunk.id)
chunk.previous_chunks = [offset_to_id[c] for c in chunk.previous_chunks] chunk.previous_chunks = [offset_to_id[c] for c in chunk.previous_chunks]
# Add the "return" chunk now that we've converted everything.
chunks.append(ByteCodeChunk(chunkid, [], [], previous_chunks=end_previous_chunks)) chunks.append(ByteCodeChunk(chunkid, [], [], previous_chunks=end_previous_chunks))
return sorted(chunks, key=lambda c: c.id) offset_to_id[self.bytecode.end_offset] = chunkid
def __get_entry_block(self, chunks: List[ByteCodeChunk]) -> int: return (sorted(chunks, key=lambda c: c.id), offset_to_id)
def __get_entry_block(self, chunks: Sequence[ByteCodeChunk]) -> int:
start_id: int = -1 start_id: int = -1
for chunk in chunks: for chunk in chunks:
if not chunk.previous_chunks: if not chunk.previous_chunks:
@ -420,7 +481,7 @@ class ByteCodeDecompiler(VerboseOutput):
raise Exception("Logic error!") raise Exception("Logic error!")
return start_id return start_id
def __compute_dominators(self, chunks: List[ByteCodeChunk]) -> Dict[int, Set[int]]: def __compute_dominators(self, chunks: Sequence[ByteCodeChunk]) -> Dict[int, Set[int]]:
# Find the start of the graph (the node with no previous entries). # Find the start of the graph (the node with no previous entries).
start_id = self.__get_entry_block(chunks) start_id = self.__get_entry_block(chunks)
@ -445,7 +506,136 @@ class ByteCodeDecompiler(VerboseOutput):
return {chunk.id: dominators[chunk.id].bitsSet for chunk in chunks} return {chunk.id: dominators[chunk.id].bitsSet for chunk in chunks}
def __separate_loops(self, chunks: List[ByteCodeChunk], dominators: Dict[int, Set[int]]) -> List[Union[ByteCodeChunk, Loop]]: def __analyze_loop_jumps(self, loop: Loop, offset_map: Dict[int, int]) -> Loop:
# Go through and try to determine which jumps are "break" and "continue" statements based on
# where they point (to the header or to the exit point). First, let's try to identify all
# exits, and which one is the break point and which ones are possibly goto statements
# (break out of multiple loop depths).
internal_jump_points = {c.id for c in loop.chunks}
header_chunks = [c for c in loop.chunks if c.id == loop.id]
if len(header_chunks) != 1:
# Should never happen, only one should match ID.
raise Exception("Logic error!")
header_chunk = header_chunks[0]
# Identify external jumps from the header.
break_points = [i for i in header_chunk.next_chunks if i not in internal_jump_points]
if len(break_points) > 1:
# We should not have two exits here, if so this isn't a loop!
raise Exception("Logic error!")
# Identify the break and continue jump points.
if not break_points:
# This might be possible, but I don't know how to deal with it.
raise Exception("Logic error!")
break_point = break_points[0]
continue_point = header_chunk.id
self.vprint(f"Loop breaks to {break_point} and continues to {continue_point}")
# Now, go through each chunk, identify whether it has an if, and fix up the
# if statements.
for chunk in loop.chunks:
if not chunk.next_chunks:
# All chunks need a next chunk of some type, the only one that doesn't
# is the end chunk which should never be part of a loop.
raise Exception("Logic error!")
if not isinstance(chunk, ByteCodeChunk):
# We don't need to fix up loops, we already did this in a previous
# fixup.
continue
last_action = chunk.actions[-1]
if isinstance(last_action, AP2Action):
if last_action.opcode in [AP2Action.THROW, AP2Action.RETURN]:
# Ignore these for now, we'll fix these up in a later stage.
continue
if last_action.opcode == AP2Action.JUMP:
# This is either an unconditional break/continue or an
# internal jump.
if len(chunk.next_chunks) != 1:
raise Exception("Logic error!")
next_chunk = chunk.next_chunks[0]
if next_chunk == break_point:
self.vprint("Converting jump to loop break into break statement.")
chunk.actions[-1] = BreakStatement()
chunk.next_chunks = []
elif next_chunk == continue_point:
self.vprint("Converting jump to loop continue into continue statement.")
chunk.actions[-1] = ContinueStatement()
chunk.next_chunks = []
elif next_chunk not in internal_jump_points:
self.vprint("Converting jump to external point into goto statement.")
chunk.actions[-1] = GotoStatement(next_chunk)
chunk.next_chunks = []
continue
if last_action.opcode == AP2Action.IF:
# Calculate true and false jump points.
true_jump_point = offset_map[cast(IfAction, last_action).jump_if_true_offset]
false_jump_points = [n for n in chunk.next_chunks if n != true_jump_point]
if len(false_jump_points) != 1:
raise Exception("Logic error!")
false_jump_point = false_jump_points[0]
# Calculate true and false jump points, see if they are break/continue/goto.
true_action: Optional[ConvertedAction] = None
if true_jump_point == break_point:
self.vprint("Converting jump if true to loop break into break statement.")
true_action = BreakStatement()
chunk.next_chunks = [n for n in chunk.next_chunks if n != true_jump_point]
elif true_jump_point == continue_point:
self.vprint("Converting jump if true to loop continue into continue statement.")
true_action = ContinueStatement()
chunk.next_chunks = [n for n in chunk.next_chunks if n != true_jump_point]
elif true_jump_point not in internal_jump_points:
self.vprint("Converting jump if true to external point into goto statement.")
true_action = GotoStatement(true_jump_point)
chunk.next_chunks = [n for n in chunk.next_chunks if n != true_jump_point]
false_action: Optional[ConvertedAction] = None
if false_jump_point == break_point:
self.vprint("Converting jump if false to loop break into break statement.")
false_action = BreakStatement()
chunk.next_chunks = [n for n in chunk.next_chunks if n != false_jump_point]
elif false_jump_point == continue_point:
self.vprint("Converting jump if false to loop continue into continue statement.")
false_action = ContinueStatement()
chunk.next_chunks = [n for n in chunk.next_chunks if n != false_jump_point]
elif false_jump_point not in internal_jump_points:
self.vprint("Converting jump if false to external point into goto statement.")
false_action = GotoStatement(false_jump_point)
chunk.next_chunks = [n for n in chunk.next_chunks if n != false_jump_point]
if true_action is None and false_action is not None:
true_action = false_action
false_action = None
negate = True
else:
negate = False
if true_action is None and false_action is None:
# This is an internal-only if statement, we don't care.
continue
chunk.actions[-1] = IntermediateIfStatement(
cast(IfAction, last_action),
[true_action],
[false_action] if false_action else [],
negate=negate,
)
continue
# Now, we have converted all external jumps to either break or goto, so we don't
# need to keep track of the next chunk aside from the break location.
loop.next_chunks = [break_point]
return loop
def __separate_loops(self, chunks: Sequence[ByteCodeChunk], dominators: Dict[int, Set[int]], offset_map: Dict[int, int]) -> List[Union[ByteCodeChunk, Loop]]:
# Find the start of the graph (the node with no previous entries). # Find the start of the graph (the node with no previous entries).
start_id = self.__get_entry_block(chunks) start_id = self.__get_entry_block(chunks)
chunks_by_id: Dict[int, Union[ByteCodeChunk, Loop]] = {chunk.id: chunk for chunk in chunks} chunks_by_id: Dict[int, Union[ByteCodeChunk, Loop]] = {chunk.id: chunk for chunk in chunks}
@ -505,7 +695,12 @@ class ByteCodeDecompiler(VerboseOutput):
# This loop does not contain any loops of its own. It is safe to # This loop does not contain any loops of its own. It is safe to
# convert. # convert.
self.vprint(f"Converting loop with header {header} and blocks {', '.join(str(b) for b in blocks)}.") self.vprint(f"Converting loop with header {header} and blocks {', '.join(str(b) for b in blocks)}.")
chunks_by_id[header] = Loop(header, [chunks_by_id[i] for i in blocks]) new_loop = Loop(header, [chunks_by_id[i] for i in blocks])
# Eliminate jumps that are to the beginning/end of the loop to
# make if statement detection later on easier.
new_loop = self.__analyze_loop_jumps(new_loop, offset_map)
chunks_by_id[header] = new_loop
# These blocks are now part of the loop, so we need to remove them # These blocks are now part of the loop, so we need to remove them
# from the IDed chunks as well as from existing loops. # from the IDed chunks as well as from existing loops.
@ -536,17 +731,30 @@ class ByteCodeDecompiler(VerboseOutput):
return [chunks_by_id[i] for i in chunks_by_id] return [chunks_by_id[i] for i in chunks_by_id]
def __separate_ifs(self, chunks: Sequence[Union[ByteCodeChunk, Loop]], offset_map: Dict[int, int]) -> List[ArbitraryCodeChunk]:
return [c for c in chunks]
def __decompile(self) -> str: def __decompile(self) -> str:
# First, we need to construct a control flow graph. # First, we need to construct a control flow graph.
chunks = self.__graph_control_flow() self.vprint("Generating control flow graph...")
chunks, offset_map = self.__graph_control_flow()
# Now, compute dominators so we can locate back-refs. # Now, compute dominators so we can locate back-refs.
self.vprint("Generating dominator list...")
dominators = self.__compute_dominators(chunks) dominators = self.__compute_dominators(chunks)
# Now, separate chunks out into chunks and loops. # Now, separate chunks out into chunks and loops.
chunks_and_loops = self.__separate_loops(chunks, dominators) self.vprint("Identifying and separating loops...")
chunks_and_loops = self.__separate_loops(chunks, dominators, offset_map)
self.vprint(chunks_and_loops) # Now, identify any remaining control flow logic.
self.vprint("Identifying and separating ifs...")
chunks_loops_and_ifs = self.__separate_ifs(chunks_and_loops, offset_map)
# At this point, we *should* have a directed graph where there are no
# backwards refs and every fork has been identified as an if. This means
# we can now walk and recursively generate pseudocode in one pass.
self.vprint(chunks_loops_and_ifs)
return "TODO" return "TODO"

View File

@ -707,7 +707,7 @@ class StoreRegisterAction(AP2Action):
class IfAction(AP2Action): class IfAction(AP2Action):
def __init__(self, offset: int, comparison: str, jump_if_true_offset: int) -> None: def __init__(self, offset: int, comparison: str, jump_if_true_offset: int) -> None:
super().__init__(offset, AP2Action.IF2) super().__init__(offset, AP2Action.IF)
self.comparison = comparison self.comparison = comparison
self.jump_if_true_offset = jump_if_true_offset self.jump_if_true_offset = jump_if_true_offset