"""
IncreaseWorkPerClockCycle
"""
import copy
import itertools
import logging
from typing import Any, Callable, Iterator
from python2verilog import ir
from python2verilog.optimizer.helpers import backwards_replace
from python2verilog.utils.peek_counter import PeekCounter
from python2verilog.utils.typed import guard, guard_dict, typed
[docs]
class IncreaseWorkPerClockCycle:
"""
A closure for the increase work per clock cycle optimizer
`threshold` (an integer >= 0) tunes
how much the code can be unrolled (and duplicated)
A larger `threshold` will result in a reduction in clock cycles,
but an increase in hardware usage
If a python generator function generates all of its outputs in O(n) time:
1) hardware optimized with `threshold=0` completes in O(n) cycles
2) hardware optimized with `threshold=x` for `x > 0` completes in O(n/(x+1)) cycles
"""
def __init__(self, root: ir.Node, threshold: int = 0):
self.visited: set[str] = set()
self.threshold = threshold
counter = PeekCounter()
self.make_unique = counter.next
self.make_unique_peek = counter.peek
self.apply(root)
[docs]
@staticmethod
def exclusive_vars(variables) -> Iterator[ir.ExclusiveVar]:
"""
Filters for exclusive variables
"""
return filter(lambda var: isinstance(var, ir.ExclusiveVar), variables)
[docs]
@staticmethod
def map_to_ver_name(variables) -> Iterator[str]:
"""
Maps a variable to its ver_name
"""
return map(lambda var: var.ver_name, variables)
[docs]
@staticmethod
def chain_generators(
iterable: Iterator[Any], *functions: Callable[[Iterator[Any]], Iterator[Any]]
) -> Iterator[Any]:
"""
Applies transformations to iterators
"""
for func in functions:
iterable = func(iterable)
yield from iterable
[docs]
def apply_recursive(
self,
edge: ir.Edge,
new_mapping: dict[ir.Var, ir.Expression],
old_mapping: dict[ir.Var, ir.Expression],
visited_path: dict[str, int],
) -> ir.Edge:
"""
Recursively visits the children, conditionally adding them to an optimal path
The concept of mapping is as follows:
If
a = 1
b = a
then b == 1, if no clock cycle occurs in-between,
at the end of this block, mapping would be
{a: 1, b: 1}
:param mapping: values of variables, given the previous logic
:param visited: visited unique_ids and exclusive vars for this nonclocked sequence
"""
assert guard(edge, ir.Edge)
assert guard_dict(new_mapping, ir.Var, ir.Expression)
assert guard_dict(old_mapping, ir.Var, ir.Expression)
assert guard(visited_path, dict)
for key, value in visited_path.items():
assert isinstance(key, (str, ir.Var)), f"{type(key)}"
assert guard(value, int), f"{type(value)}"
node = edge.child
assert node
logging.debug(
"%s node %s, new %s, old %s",
self.apply_recursive.__name__,
node,
new_mapping,
old_mapping,
)
# If clocked, then switch to new mapping
if isinstance(edge, ir.ClockedEdge):
old_mapping = copy.copy(new_mapping)
# Check for cyclic paths
if (
node.unique_id in visited_path
and visited_path[node.unique_id] > self.threshold
):
assert guard(node, ir.Node)
self.apply(node)
return edge
# if "_data = _data__state2_while0_out0" == str(node):
# breakpoint()
# Exclusive vars can only be visited once
exclusive_vars = set(node.exclusions())
if exclusive_vars & visited_path.keys():
logging.debug(
"Intersection %s = {%s & %s} ending on %s",
exclusive_vars & visited_path.keys(),
exclusive_vars,
visited_path.keys(),
node,
)
assert guard(edge, ir.ClockedEdge)
assert guard(node, ir.Node)
self.apply(node)
return edge
# Update visited
if isinstance(node, ir.AssignNode) and isinstance(node.lvalue, ir.ExclusiveVar):
assert guard(
node.lvalue.exclusive_group, str
), f"{type(node.lvalue.exclusive_group)}"
visited_path[node.lvalue.exclusive_group] = 1
visited_path[node.unique_id] = visited_path.get(node.unique_id, 0) + 1
new_edge: ir.Edge = ir.NonClockedEdge(
unique_id=f"{edge.unique_id}_optimal_{self.make_unique()}"
)
if isinstance(node, ir.IfElseNode):
new_edge.child = ir.IfElseNode(
unique_id=f"{node.unique_id}_optimal_{self.make_unique()}",
condition=backwards_replace(node.condition, old_mapping),
true_edge=self.apply_recursive(
edge=node.true_edge,
new_mapping=copy.copy(new_mapping),
old_mapping=copy.copy(old_mapping),
visited_path=copy.copy(visited_path),
),
false_edge=self.apply_recursive(
edge=node.false_edge,
new_mapping=copy.copy(new_mapping),
old_mapping=copy.copy(old_mapping),
visited_path=copy.copy(visited_path),
),
)
elif isinstance(node, ir.AssignNode):
unique_id = f"{node.unique_id}_optimal_{self.make_unique()}"
new_rvalue = backwards_replace(node.rvalue, old_mapping)
new_mapping[node.lvalue] = new_rvalue
if node.has_child():
assert guard(node.child, ir.Edge)
new_edge.child = ir.AssignNode(
unique_id=unique_id,
lvalue=node.lvalue,
rvalue=new_rvalue,
child=self.apply_recursive(
edge=node.child,
new_mapping=new_mapping,
old_mapping=old_mapping,
visited_path=visited_path,
),
)
else:
new_edge.child = node
else:
raise RuntimeError(f"{type(node)}")
return new_edge
[docs]
def apply(
self,
root: ir.Node,
) -> None:
"""
Optimizes a node, by increasing amount of work done in a cycle.
Creates an optimal path that maximizes nonclocked edges.
"""
assert guard(root, ir.Node)
logging.debug("%s on %s", self.apply.__name__, root)
if root.unique_id in self.visited:
return
self.visited.add(root.unique_id)
if isinstance(root, ir.BasicNode):
mapper: dict[ir.Var, ir.Expression] = {}
visited_path: dict[str, int] = {}
if isinstance(root, ir.AssignNode):
mapper[root.lvalue] = root.rvalue
if isinstance(root.lvalue, ir.ExclusiveVar):
visited_path[root.lvalue.exclusive_group] = 1
if root.has_child():
assert guard(root.child, ir.Edge)
assert guard(root.child.child, ir.Node)
root.optimal_child = self.apply_recursive(
root.child, mapper, {}, visited_path
)
elif isinstance(root, ir.IfElseNode):
root.optimal_true_edge = self.apply_recursive(root.true_edge, {}, {}, {})
root.optimal_false_edge = self.apply_recursive(root.false_edge, {}, {}, {})
else:
raise RuntimeError(f"{type(root)}")