"""
Control Flow Graph-related analysis
"""
from networkx import DiGraph # type: ignore
import networkx as nx # type: ignore
from typing import Optional, Dict, Union, List
from cinspector.nodes import Util, BorderNode, Node
from cinspector.nodes import ForStatementNode, YForLoopNode, NForLoopNode
from cinspector.nodes import WhileStatementNode, YWhileLoopNode, NWhileLoopNode
from cinspector.nodes import IfStatementNode, YConditionNode, NConditionNode
from cinspector.nodes import SwitchNode, FunctionDefinitionNode
from cinspector.nodes import DoWhileLoopNode, DoStatementNode
[docs]
class BasicBlock(Util):
def __init__(self, nodes: list) -> None:
self.nodes = nodes
[docs]
class BaseCFG(Util):
def __init__(self, stmts: List[Node]) -> None:
self.stmts = stmts
self.start: Optional[BorderNode] = None
self.end: Optional[BorderNode] = None
self.cfg = DiGraph()
self.generate()
[docs]
def execution_path(self):
path = list(nx.all_simple_paths(self.cfg, self.start, self.end))
# filter out start and end
path = [_[1:-1] for _ in path]
return path
[docs]
def generate(self):
"""CFG.generate() generate the control flow graph"""
"""
construct the initial state first
int a, b;
a = 1;
b = 2;
to:
<START>
|
int a, b;
|
a = 1;
|
b = 2;
|
<END>
"""
stmt_lst = self.stmts
stmt_lst.insert(0, BorderNode(node_type='<START>'))
stmt_lst.append(BorderNode(node_type='<END>'))
self.start, self.end = stmt_lst[0], stmt_lst[-1]
for _ in range(len(stmt_lst) - 1):
self.cfg.add_edge(stmt_lst[_], stmt_lst[_ + 1])
# iteratively detect branch
cur = [self.start]
"""
record the label and corresponding statement
fail:
free(sth);
return -1;
label_map will create the mapping between "fail" and free statement
"""
label_map: Dict[str:Union[Node, List[Node]]] = dict()
def _update_label_map(_c: Node, header: Union[Node, List[Node]]):
"""
The initial recorded label statement may be
deconstructed, we need to track this variation.
The <header> may also be list, e.g. [ycond, ncond]
for if statement. This function should typically
invoked while some nodes is deleted in self.cfg.
Args:
_c (Node): the deleted node
header (Union[Node, List[node]]): the statement that replaces _c
Returns:
no return value
"""
update = list()
for _k, _v in label_map.items():
if _v == _c:
update.append(_k)
for _u in update:
# print(f'{_u}: {label_map[_u]} -> {header}')
label_map[_u] = header
while True:
if not cur:
break
next = []
changed = 0
for _c in cur:
next += list(self.cfg.successors(_c))
if _c.node_type == 'compound_statement':
# divide compound statement directly
pred = list(self.cfg.predecessors(_c))
succ = list(self.cfg.successors(_c))
children = [
_ for _ in _c.children if not _.node_type in ['{', '}']
]
# skip empty compound statement
if not children:
continue
# link every child
for _ in range(len(children) - 1):
self.cfg.add_edge(children[_], children[_ + 1])
# link the first and last child node with pred and succ node
for _p in pred:
self.cfg.add_edge(_p, children[0])
for _s in succ:
self.cfg.add_edge(children[-1], _s)
# remove the old node
self.cfg.remove_node(_c)
# since the graph is changed, analyse from scratch
changed = 1
_update_label_map(_c, children[0])
elif _c.node_type == 'labeled_statement':
# record the statement corresponding to label
label = _c.child_by_field_name('label').src
label_statement = _c.children[-1]
label_map[label] = label_statement
# replace label with statement
pred = list(self.cfg.predecessors(_c))
succ = list(self.cfg.successors(_c))
for _p in pred:
self.cfg.add_edge(_p, label_statement)
for _s in succ:
self.cfg.add_edge(label_statement, _s)
# remove label node
self.cfg.remove_node(_c)
changed = 1
_update_label_map(_c, label_statement)
elif _c.node_type == 'if_statement':
"""
Ideally:
from:
pred
|
condition
/ \
consequence alternative
\ /
succ
to:
pred
/ \
ycond ncond
| |
consequence alternative
\ /
succ
if alternative doesn't exist, the edge between alternative
and succ nodes will be replaced by the edge between ncond
and succ node.
from:
pred
|
condition
/ \
consequence |
\ /
succ
to:
pred
/ \
ycond |
| |
consequence ncond
\ /
succ
"""
assert (isinstance(_c, IfStatementNode))
ycond = YConditionNode(_c.condition)
ncond = NConditionNode(_c.condition)
self.cfg.add_edge(ycond, _c.consequence)
if _c.alternative:
self.cfg.add_edge(ncond, _c.alternative)
# add new edge. don't remove old edges, they will be removed while deleting old node
pred = list(self.cfg.predecessors(_c))
succ = list(self.cfg.successors(_c))
for _p in pred:
self.cfg.add_edge(_p, ycond)
self.cfg.add_edge(_p, ncond)
for _s in succ:
self.cfg.add_edge(_c.consequence, _s)
if _c.alternative:
self.cfg.add_edge(_c.alternative, _s)
else:
self.cfg.add_edge(ncond, _s)
# remove old node
self.cfg.remove_node(_c)
# since the graph is changed, analyse from scratch
changed = 1
_update_label_map(_c, [ycond, ncond])
elif _c.node_type == 'switch_statement':
pred = list(self.cfg.predecessors(_c))
succ = list(self.cfg.successors(_c))
condition = _c.child_by_field_name('condition')
case_statement_lst = _c.descendants_by_type_name(
'case_statement')
_sw_node_lst = []
for _case_statement in case_statement_lst:
_value = _case_statement.child_by_field_name('value')
_sw_node = SwitchNode(condition, _value)
_sw_node_lst.append(_sw_node)
_children = []
for _ in _case_statement.children:
if _.node_type in ['case', ':', 'default']:
continue
if _value and _.src == _value.src:
continue
_children.append(_)
for _p in pred:
self.cfg.add_edge(_p, _sw_node)
# statements exist in case
if _children:
self.cfg.add_edge(_sw_node, _children[0])
for _ in range(len(_children) - 1):
self.cfg.add_edge(_children[_],
_children[_ + 1])
for _s in succ:
self.cfg.add_edge(_children[-1], _s)
# no statement within case
else:
for _s in succ:
self.cfg.add_edge(_sw_node, _s)
self.cfg.remove_node(_c)
changed = 1
_update_label_map(_c, _sw_node_lst)
elif _c.node_type == 'return_statement':
""" To avoid the statements after return statement (e.g.
the labeled_statement) is ignored directly, we unlink
the return statement with successors later.
"""
pass
elif _c.node_type == 'for_statement':
assert (isinstance(_c, ForStatementNode))
yloop = YForLoopNode(_c.initializer, _c.condition,
_c.update)
nloop = NForLoopNode(_c.initializer, _c.condition,
_c.update)
if _c.body:
loop_body = _c.body
self.cfg.add_edge(yloop, loop_body)
else:
# loop body may not exist
loop_body = yloop
pred = list(self.cfg.predecessors(_c))
succ = list(self.cfg.successors(_c))
for _p in pred:
self.cfg.add_edge(_p, yloop)
self.cfg.add_edge(_p, nloop)
for _s in succ:
self.cfg.add_edge(loop_body, _s)
self.cfg.add_edge(nloop, _s)
# remove old for statement
self.cfg.remove_node(_c)
changed = 1
_update_label_map(_c, [yloop, nloop])
elif _c.node_type == 'do_statement':
assert (isinstance(_c, DoStatementNode))
cond = DoWhileLoopNode(_c.condition)
body = _c.body
pred = list(self.cfg.predecessors(_c))
succ = list(self.cfg.successors(_c))
self.cfg.add_edge(body, cond)
for _p in pred:
self.cfg.add_edge(_p, body)
for _s in succ:
self.cfg.add_edge(cond, _s)
self.cfg.remove_node(_c)
changed = 1
_update_label_map(_c, [body])
elif _c.node_type == 'while_statement':
assert (isinstance(_c, WhileStatementNode))
yloop = YWhileLoopNode(_c.condition)
nloop = NWhileLoopNode(_c.condition)
if _c.body:
loop_body = _c.body
self.cfg.add_edge(yloop, loop_body)
else:
# loop body may not exist
loop_body = yloop
pred = list(self.cfg.predecessors(_c))
succ = list(self.cfg.successors(_c))
for _p in pred:
self.cfg.add_edge(_p, yloop)
self.cfg.add_edge(_p, nloop)
for _s in succ:
self.cfg.add_edge(loop_body, _s)
self.cfg.add_edge(nloop, _s)
# remove old for statement
self.cfg.remove_node(_c)
changed = 1
_update_label_map(_c, [yloop, nloop])
else:
assert "Undefined statement"
if changed:
next = [self.start]
break
# start analysing next nodes
cur = next
# link goto and label
goto_lst = []
for _n in self.cfg.nodes:
if _n.node_type == 'goto_statement':
goto_lst.append(_n)
# remove old edges
succ = list(self.cfg.successors(_n))
for _s in succ:
self.cfg.remove_edge(_n, _s)
for _g in goto_lst:
label = _g.child_by_field_name('label').src
assert (label in list(label_map.keys()))
if not isinstance(label_map[label], list):
label_map[label] = [label_map[label]]
for _ in label_map[label]:
self.cfg.add_edge(_g, _)
# unlink return statement and successors
for _n in self.cfg.nodes:
if _n.node_type == 'return_statement':
succ = list(self.cfg.successors(_n))
for _s in succ:
self.cfg.remove_edge(_n, _s)
self.cfg.add_edge(_n, self.end)
[docs]
def merge(self):
"""
self.generate generate the DiGraph in which nodes are single statement
self.merge merge the statements into the basic block
"""
pass
[docs]
class CFG(BaseCFG):
def __init__(self, function_def: FunctionDefinitionNode) -> None:
self.function_def = function_def
stmt_lst = [
_ for _ in self.function_def.body.children
if _.node_type not in ['{', '}']
]
super().__init__(stmt_lst)