Source code for py2cfg.builder

#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Control flow graph builder.
"""

from collections import defaultdict, deque
from typing import Dict, List, Optional, DefaultDict, Deque, Tuple, Union
import ast  # type: ignore
import enum
from _ast import (
    Import,
    Break,
    AnnAssign,
    For,
    ImportFrom,
    Assign,
    AugAssign,
    Call,
    ClassDef,
    Compare,
    Expr,
    FunctionDef,
    If,
    Module,
    Return,
    stmt,
    While,
    Yield,
    Try,
    ExceptHandler,
)  # type: ignore
from py2cfg.model import Block, TryBlock, Link, CFG, FuncBlock


[docs]def invert( node: Union[Compare, ast.expr] ) -> Union[ast.NameConstant, ast.UnaryOp, Compare]: """ Invert the operation in an ast node object (get its negation). Args: node: An ast node object. Returns: An ast node object containing the inverse (negation) of the input node. """ inverse = { ast.Eq: ast.NotEq, ast.NotEq: ast.Eq, ast.Lt: ast.GtE, ast.LtE: ast.Gt, ast.Gt: ast.LtE, ast.GtE: ast.Lt, ast.Is: ast.IsNot, ast.IsNot: ast.Is, ast.In: ast.NotIn, ast.NotIn: ast.In, } if isinstance(node, ast.Compare): op = type(node.ops[0]) inverse_node: Union[ast.NameConstant, ast.UnaryOp, Compare] = ast.Compare( left=node.left, ops=[inverse[op]()], comparators=node.comparators ) elif isinstance(node, ast.NameConstant) and node.value in [True, False]: inverse_node = ast.NameConstant(value=not node.value) else: inverse_node = ast.UnaryOp(op=ast.Not(), operand=node) return inverse_node
[docs]def merge_exitcases( exit1: Union[Compare, ast.BoolOp, ast.expr, None], exit2: Union[Compare, ast.BoolOp, ast.expr, None], ) -> Union[Compare, ast.BoolOp, ast.expr, None]: """ Merge the exitcases of two Links. Args: exit1: The exitcase of a Link object. exit2: Another exitcase to merge with exit1. Returns: The merged exitcases. """ if exit1: if exit2: return ast.BoolOp(ast.And(), values=[exit1, exit2]) return exit1 return exit2
[docs]class TryEnum(enum.IntEnum): BODY = enum.auto() EXCEPT = enum.auto() ELSE = enum.auto() FINAL = enum.auto()
class TryStackObject: def __init__( self, try_block: TryBlock, after_block: Block, has_final: bool, iter_state: Optional[TryEnum] = None, ) -> None: self.try_block = try_block self.after_block = after_block self.has_final = has_final self.iter_state = iter_state @property def node(self) -> ast.Try: return self.try_block.statements[0] # type: ignore
[docs]class CFGBuilder(ast.NodeVisitor): """ Control flow graph builder. A control flow graph builder is an ast.NodeVisitor that can walk through a program's AST and iteratively build the corresponding CFG. """ def __init__( self, short: bool = True, treebuf: DefaultDict[str, Deque] = None ) -> None: self.isShort = short self._callbuf: List[FuncBlock] = [] self._treebuf = defaultdict(deque) if treebuf is None else treebuf @property def loop_stack(self): return self._treebuf["loop_stack"] @property def try_stack(self) -> Deque[TryStackObject]: return self._treebuf["try_stack"] # ---------- CFG building methods ---------- #
[docs] def build( self, name: str, tree: Module, asynchr: bool = False, entry_id: int = 0 ) -> CFG: """ Build a CFG from an AST. Args: name: The name of the CFG being built. tree: The root of the AST from which the CFG must be built. async: Boolean indicating whether the CFG being built represents an asynchronous function or not. When the CFG of a Python program is being built, it is considered like a synchronous 'main' function. entry_id: Value for the id of the entry block of the CFG. Returns: The CFG produced from the AST. """ self.cfg = CFG(name, asynchr=asynchr, short=self.isShort) # Tracking of the current block while building the CFG. self.current_id = entry_id self.current_block = self.new_block() self.cfg.entryblock = self.current_block # Actual building of the CFG is done here. self.visit(tree) self.clean_cfg(self.cfg.entryblock) return self.cfg
[docs] def build_from_src(self, name: str, src: str) -> CFG: """ Build a CFG from some Python source code. Args: name: The name of the CFG being built. src: A string containing the source code to build the CFG from. Returns: The CFG produced from the source code. """ tree = ast.parse(src, mode="exec") return self.build(name, tree)
[docs] def build_from_file(self, name: str, filepath: str) -> CFG: """ Build a CFG from some Python source file. Args: name: The name of the CFG being built. filepath: The path to the file containing the Python source code to build the CFG from. Returns: The CFG produced from the source file. """ with open(filepath, "r") as src_file: src = src_file.read() return self.build_from_src(name, src)
# ---------- Graph management methods ---------- #
[docs] def new_block(self, statement=None) -> Block: """ Create a new block with a new id. Returns: A Block object with a new unique id. """ self.current_id += 1 block = Block(self.current_id) if statement is not None: block.add_statement(statement) return block
[docs] def new_func_block(self) -> FuncBlock: """ Create a new function block with a new id. Returns: A FuncBlock object with a new unique id. """ self.current_id += 1 return FuncBlock(self.current_id)
def new_try_block(self, statement=None): self.current_id += 1 block = TryBlock(self.current_id) if statement is not None: block.add_statement(statement) return block
[docs] def add_statement(self, block: Block, statement: Union[stmt, Call]) -> None: """ Add a statement to a block. Args: block: A Block object to which a statement must be added. statement: An AST node representing the statement that must be added to the current block. """ block.statements.append(statement)
[docs] def add_exit( self, block: Block, nextblock: Block, exitcase: Union[Compare, None, ast.BoolOp, ast.expr] = None, ) -> None: """ Add a new exit to a block. Args: block: A block to which an exit must be added. nextblock: The block to which control jumps from the new exit. exitcase: An AST node representing the 'case' (or condition) leading to the exit from the block in the program. """ newlink = Link(block, nextblock, exitcase) block.exits.append(newlink) nextblock.predecessors.append(newlink)
[docs] def new_loopguard(self) -> Block: """ Create a new block for a loop's guard if the current block is not empty. Links the current block to the new loop guard. Returns: The block to be used as new loop guard. """ if self.current_block.is_empty() and len(self.current_block.exits) == 0: # If the current block is empty and has no exits, it is used as # entry block (condition test) for the loop. loopguard = self.current_block else: # Jump to a new block for the loop's guard if the current block # isn't empty or has exits. loopguard = self.new_block() self.add_exit(self.current_block, loopguard) return loopguard
[docs] def new_classCFG(self, node: ClassDef, asynchr: bool = False) -> None: """ Create a new sub-CFG for a class definition and add it to the class CFGs of the CFG being built. Args: node: The AST node containing the class definition. async: Boolean indicating whether the class for which the CFG is being built is asynchronous or not. """ self.current_id += 1 # A new sub-CFG is created for the body of the class definition and # added to the class CFGs of the current CFG. class_body = ast.Module(body=node.body) class_builder = CFGBuilder(self.isShort, self._treebuf) self.cfg.classcfgs[node.name] = class_builder.build( node.name, class_body, asynchr, self.current_id ) self.current_id = class_builder.current_id + 1
[docs] def new_functionCFG(self, node: FunctionDef, asynchr: bool = False) -> None: """ Create a new sub-CFG for a function definition and add it to the function CFGs of the CFG being built. Args: node: The AST node containing the function definition. async: Boolean indicating whether the function for which the CFG is being built is asynchronous or not. """ self.current_id += 1 # A new sub-CFG is created for the body of the function definition and # added to the function CFGs of the current CFG. func_body = ast.Module(body=node.body) func_builder = CFGBuilder(self.isShort, self._treebuf) cfg = self.cfg.functioncfgs[node.name] = func_builder.build( node.name, func_body, asynchr, self.current_id ) self.current_id = func_builder.current_id + 1 cfg.lineno = node.lineno cfg.end_lineno = node.end_lineno
[docs] def clean_cfg(self, block: Block, visited: List[Block] = []) -> None: """ Remove the useless (empty) blocks from a CFG. Args: block: The block from which to start traversing the CFG to clean it. visited: A list of blocks that already have been visited by clean_cfg (recursive function). """ # Don't visit blocks twice. if block in visited: return visited.append(block) # Empty blocks are removed from the CFG. if block.is_empty(): for pred in block.predecessors: for exit in block.exits: self.add_exit( pred.source, exit.target, merge_exitcases(pred.exitcase, exit.exitcase), ) # Check if the exit hasn't yet been removed from # the predecessors of the target block. if exit in exit.target.predecessors: exit.target.predecessors.remove(exit) # Check if the predecessor hasn't yet been removed from # the exits of the source block. if pred in pred.source.exits: pred.source.exits.remove(pred) block.predecessors = [] for exit in block.exits: self.clean_cfg(exit.target, visited) block.exits = [] else: for exit in block.exits: self.clean_cfg(exit.target, visited)
# ---------- AST Node visitor methods ---------- # def visit_ClassDef(self, node: ClassDef) -> None: self.add_statement(self.current_block, node) self.new_classCFG(node, asynchr=False) def visit_Expr(self, node: Expr) -> None: self.add_statement(self.current_block, node) self.generic_visit(node) def visit_Call(self, node: Call) -> None: def visit_func(node): if isinstance(node, ast.Name): return node.id elif isinstance(node, ast.Attribute): # Recursion on series of calls to attributes. func_name = visit_func(node.value) func_name += "." + node.attr return func_name elif isinstance(node, ast.Subscript): if "id" in node.value._fields: return node.value.id elif "attr" in node.value._fields: return visit_func(node.value) else: raise AttributeError("WTF is this thing, build it in??", type(node)) elif isinstance(node, ast.Constant): return node.value elif isinstance(node, ast.Call): if "id" in node.func._fields: return node.func.id elif "attr" in node.func._fields: return visit_func(node.func) else: raise AttributeError("WTF is this thing, build it in??", type(node)) else: print("WTF is this thing, build it in??", type(node)) func = node.func func_name = visit_func(func) if isinstance(node, ast.Call): func_block = self.new_func_block() self.add_statement(func_block, node) func_block.name = func_name if self._callbuf: # Func block is argument of last block in self._callbuf self._callbuf[-1].args.append(func_block) else: # Not inside argument context. self.current_block.func_calls.append(func_name) self.current_block.func_blocks.append(func_block) self._callbuf.append(func_block) for arg in node.args: self.visit(arg) top = self._callbuf.pop() assert hash(top) == hash(func_block) else: self.current_block.func_calls.append(func_name) def visit_Assign(self, node: Assign) -> None: self.add_statement(self.current_block, node) self.generic_visit(node) def visit_AnnAssign(self, node: AnnAssign) -> None: self.add_statement(self.current_block, node) self.generic_visit(node) def visit_AugAssign(self, node: AugAssign) -> None: self.add_statement(self.current_block, node) self.generic_visit(node) def visit_Raise(self, node): if self.current_block.statements: raise_block = self.new_block(node) self.current_block.add_exit(raise_block) self.current_block = raise_block else: self.current_block.add_statement(node) if not self.try_stack: # Raise statement outside of try block # If we don't know where control jumps, this is the last block self.current_block = self.new_block(node) return if isinstance(node.exc, ast.Call): e_id = node.exc.func.id elif isinstance(node.exc, ast.Name): e_id = node.exc.id else: raise ValueError(f"Unexpected object {node.exc}") for tryobj in list(self.try_stack): def contains(item, state): return ( item in tryobj.try_block.except_blocks and tryobj.iter_state == state ) if contains(e_id, TryEnum.BODY): # try: # raise StopIteration # except StopIteration: # control_transfer_here = True # except Exception: # control_transfer_here = False self.current_block.add_exit(tryobj.try_block.except_blocks[e_id]) break elif contains(None, TryEnum.BODY): # try: # raise StopIteration # except: # control_transfers_here = 1 self.current_block.add_exit(tryobj.try_block.except_blocks[None]) break elif contains(e_id, TryEnum.EXCEPT) or contains(None, TryEnum.EXCEPT): if tryobj.has_final: _after_block = self.new_block() self.current_block.add_exit(_after_block) _after_block.add_exit(self.current_block) self.current_block = _after_block for child in tryobj.node.finalbody: if isinstance(child, ast.Raise): break self.visit(child) elif tryobj.iter_state == TryEnum.ELSE: if tryobj.has_final: _after_block = self.new_block() self.current_block.add_exit(_after_block) _after_block.add_exit(self.current_block) self.current_block = _after_block for child in tryobj.node.finalbody: if isinstance(child, ast.Raise): break self.visit(child) elif tryobj.iter_state != TryEnum.FINAL and tryobj.has_final: # try: raise Exception # finally: pass _after_block = self.new_block() self.current_block.add_exit(_after_block) self.current_block = _after_block for child in tryobj.node.finalbody: if isinstance(child, ast.Raise): top = self.try_stack.popleft() break self.visit(child) self.current_block = self.new_block() def visit_Assert(self, node): self.add_statement(self.current_block, node) # New block for the case in which the assertion 'fails'. failblock = self.new_block() self.add_exit(self.current_block, failblock, invert(node.test)) # If the assertion fails, the current flow ends, so the fail block is a # final block of the CFG. self.cfg.finalblocks.append(failblock) # If the assertion is True, continue the flow of the program. successblock = self.new_block() self.add_exit(self.current_block, successblock, node.test) self.current_block = successblock self.generic_visit(node) def visit_If(self, node: If) -> None: # If it already has something in it, we make a new block if self.current_block.statements: # Add the If statement at the beginning of the new block. cond_block = self.new_block() self.add_statement(cond_block, node) self.add_exit(self.current_block, cond_block) self.current_block = cond_block else: # Add the If statement at the end of the current block. self.add_statement(self.current_block, node) if any(isinstance(node.test, T) for T in (ast.Compare, ast.Call)): self.visit(node.test) # Create a new block for the body of the if. (storing the True case) if_block = self.new_block() self.add_exit(self.current_block, if_block, node.test) # Create a block for the code after the if-else. afterif_block = self.new_block() # New block for the body of the else if there is an else clause. if node.orelse: else_block = self.new_block() self.add_exit(self.current_block, else_block, invert(node.test)) self.current_block = else_block # Visit the children in the body of the else to populate the block. for child in node.orelse: self.visit(child) self.add_exit(self.current_block, afterif_block) else: self.add_exit(self.current_block, afterif_block, invert(node.test)) # Visit children to populate the if block. self.current_block = if_block for child in node.body: self.visit(child) self.add_exit(self.current_block, afterif_block) # Continue building the CFG in the after-if block. self.current_block = afterif_block def visit_While(self, node: While) -> None: # TODO while/else loop_guard = self.new_loopguard() self.current_block = loop_guard self.add_statement(self.current_block, node) if isinstance(node.test, ast.Call): self.visit(node.test) # New block for the case where the test in the while is True. while_block = self.new_block() self.add_exit(self.current_block, while_block, node.test) # New block for the case where the test in the while is False. afterwhile_block = self.new_block() self.add_exit(self.current_block, afterwhile_block, invert(node.test)) # Populate the while block. self.current_block = while_block self.loop_stack.appendleft((afterwhile_block, loop_guard)) for child in node.body: self.visit(child) top = self.loop_stack.popleft() assert top == (afterwhile_block, loop_guard) self.add_exit(self.current_block, loop_guard) # Continue building the CFG in the after-while block. self.current_block = afterwhile_block def visit_For(self, node: For) -> None: # TODO for/else loop_guard = self.new_loopguard() self.current_block = loop_guard self.add_statement(self.current_block, node) if isinstance(node.iter, ast.Call): self.visit(node.iter) # New block for the body of the for-loop. for_block = self.new_block() self.add_exit(self.current_block, for_block, node.iter) # Block of code after the for loop. afterfor_block = self.new_block() self.add_exit(self.current_block, afterfor_block) self.current_block = for_block # Push exit destinations for break/continue statements. # On break, control jumps to afterfor_block. # On continue, control jumps to loop_guard. self.loop_stack.appendleft((afterfor_block, loop_guard)) # Populate the body of the for loop. for child in node.body: self.visit(child) top = self.loop_stack.popleft() assert top == (afterfor_block, loop_guard) self.add_exit(self.current_block, loop_guard) # Continue building the CFG in the after-for block. self.current_block = afterfor_block def visit_Break(self, node: Break) -> None: assert self.loop_stack after_block: Block after_block, _ = self.loop_stack[0] self.current_block.add_statement(node) self.current_block.add_exit(after_block) self.current_block = self.new_block() def visit_Continue(self, node): assert self.loop_stack loop_guard: Block _, loop_guard = self.loop_stack[0] self.add_statement(self.current_block, node) self.add_exit(self.current_block, loop_guard) self.current_block = self.new_block() def visit_Import(self, node: Import) -> None: self.add_statement(self.current_block, node) def visit_ImportFrom(self, node: ImportFrom) -> None: self.add_statement(self.current_block, node) def visit_FunctionDef(self, node: FunctionDef) -> None: self.add_statement(self.current_block, node) self.new_functionCFG(node, asynchr=False) def visit_AsyncFunctionDef(self, node): self.add_statement(self.current_block, node) self.new_functionCFG(node, asynchr=True) def visit_Await(self, node): afterawait_block = self.new_block() self.add_exit(self.current_block, afterawait_block) self.generic_visit(node) self.current_block = afterawait_block def visit_Return(self, node: Return) -> None: if self.current_block.statements: return_block = self.new_block() self.current_block.add_exit(return_block) self.current_block = return_block self.add_statement(self.current_block, node) if self.try_stack: stackobj = self.try_stack[0] if stackobj.iter_state != TryEnum.FINAL and stackobj.has_final: after_return = self.new_block() self.current_block.add_exit(after_return) after_return.add_exit(self.current_block) self.current_block = after_return for child in stackobj.node.finalbody: self.visit(child) self.cfg.finalblocks.append(self.current_block) # Continue in a new block but without any jump to it -> all code after # the return statement will not be included in the CFG. self.current_block = self.new_block() def visit_Yield(self, node: Yield) -> None: self.cfg.asynchr = True afteryield_block = self.new_block() self.add_exit(self.current_block, afteryield_block) self.current_block = afteryield_block def visit_Try(self, node: Try): try_block = self.new_try_block(node) after_tryblock = self.new_block() self.current_block.add_exit(try_block) stackobj = TryStackObject(try_block, after_tryblock, bool(node.finalbody)) self.try_stack.appendleft(stackobj) stackobj.iter_state = TryEnum.EXCEPT for child in node.handlers: self.current_block = self.new_block() # If we encounter a raise statement during body iteration, # we can link the raise block to the appropriate exception block (if any). try_block.except_blocks[ None if child.type is None else child.type.id ] = self.current_block self.visit(child) stackobj.iter_state = TryEnum.BODY self.current_block = try_block for child in node.body: self.visit(child) if node.orelse: stackobj.iter_state = TryEnum.ELSE else_block = self.new_block() self.current_block.add_exit(else_block) self.current_block = else_block for child in node.orelse: self.visit(child) self.current_block.add_exit(after_tryblock) self.current_block = after_tryblock if node.finalbody: stackobj.iter_state = TryEnum.FINAL for child in node.finalbody: if isinstance(child, ast.Raise): top = self.try_stack.popleft() self.visit_Raise(child) self.try_stack.appendleft(top) break self.visit(child) else: next_block = self.new_block() self.current_block.add_exit(next_block) self.current_block = next_block del self.try_stack[0] def visit_ExceptHandler(self, node: ExceptHandler): assert self.try_stack for child in node.body: self.visit(child) self.current_block.add_exit(self.try_stack[0].after_block)