Source code for python2verilog.backend.verilog.module

"""
Creates module from context and FSM
"""

from typing import Iterator, cast

from python2verilog import ir
from python2verilog.backend.verilog import ast as ver
from python2verilog.ir.expressions import UInt
from python2verilog.optimizer.helpers import backwards_replace
from python2verilog.utils.lines import Lines


[docs] class Module(ver.Module): """ A module that implements the python2verilog module interface """ def __init__(self, context: ir.Context, root: ver.Case): """ Creates a module wrapper from the context Requires context for I/O and declarations """ assert isinstance(root, ver.Case) assert isinstance(context, ir.Context) inputs: list[str] = [] for var in context.input_vars: inputs.append(var.py_name) outputs = [] for var in context.output_vars: outputs.append(var.ver_name) def make_debug_display(context: ir.Context): """ Creates a display statement for all signals $display("%0d, ...", ...); """ vars_: list[str] = [] vars_ += map( lambda x: x.ver_name, context.signals.instance_specific_values() ) vars_ += map(lambda x: x.py_name, context.input_vars) # module inputs vars_ += map(lambda x: x.ver_name, context.input_vars) # cache vars_ += map(lambda x: x.ver_name, context.output_vars) vars_ += map(lambda x: x.ver_name, context.local_vars) str_ = f'$display("{context.name},%s,' str_ += "=%0d,".join(vars_) + '=%0d", ' str_ += f"{context.state_var.ver_name}.name, " str_ += ", ".join(vars_) str_ += ");" return str_ def create_instance_zeroed_signals() -> Iterator[ver.Statement]: """ Instance signals that should always be set to zero be default """ for instance in context.generator_instances.values(): yield ver.NonBlockingSubsitution(instance.signals.ready, ir.UInt(0)) yield ver.NonBlockingSubsitution(instance.signals.start, ir.UInt(0)) always_body = ( [ ver.Statement("`ifdef DEBUG"), ver.Statement(make_debug_display(context)), ver.Statement("`endif"), ver.Statement(), ] + list(create_instance_zeroed_signals()) + [ ver.Statement(), ver.IfElse( context.signals.ready, cast( list[ver.Statement], [ ver.NonBlockingSubsitution( context.signals.valid, ir.UInt(0) ), ver.NonBlockingSubsitution( context.signals.done, ir.UInt(0) ), ], ), [], ), ] + [ ver.Statement(), ver.Statement(comment="Start signal takes precedence over reset"), ver.IfElse( ir.UBinOp(context.signals.reset, "||", context.signals.start), then_body=[ ver.NonBlockingSubsitution( context.state_var, context.idle_state, ), ver.NonBlockingSubsitution( context.signals.done, ir.UInt(0), ), ver.NonBlockingSubsitution( context.signals.valid, ir.UInt(0), ), ], else_body=[], ), ver.Statement(), ] + Module.make_start_ifelse(root, context) ) always = ver.PosedgeSyncAlways(clock=context.signals.clock, body=always_body) module_body: list[ver.Statement] = [] module_body += [ ver.Statement(comment="Local variables"), ] context.local_vars.sort(key=lambda x: x.ver_name) module_body += [ ver.Declaration(v.ver_name, reg=True, signed=True) for v in context.local_vars ] module_body += [ ver.Declaration(var.ver_name, reg=True, signed=True) for var in context.input_vars ] for instance in context.generator_instances.values(): module_body.append( ver.Statement( comment="================ Function Instance ================" ) ) module = context.namespace[instance.module_name] defaults: dict[ir.Var, ir.Var] = { module.signals.valid: instance.signals.valid, module.signals.done: instance.signals.done, module.signals.clock: context.signals.clock, module.signals.start: instance.signals.start, module.signals.reset: instance.signals.reset, module.signals.ready: instance.signals.ready, } for var in instance.inputs: module_body.append(ver.Declaration(name=var.ver_name, reg=True)) for var in instance.outputs: module_body.append(ver.Declaration(name=var.ver_name)) module_body.append( ver.Declaration(name=instance.signals.valid.ver_name, size=1) ) module_body.append( ver.Declaration(name=instance.signals.done.ver_name, size=1) ) module_body.append( ver.Declaration(name=instance.signals.start.ver_name, size=1, reg=True) ) module_body.append( ver.Declaration(name=instance.signals.ready.ver_name, size=1, reg=True) ) module_body.append( ver.Instantiation( instance.module_name, instance.var.ver_name, { key.py_name: value.ver_name for key, value in zip( module.input_vars, instance.inputs, ) } | { key.ver_name: value.ver_name for key, value in zip( module.output_vars, instance.outputs, ) } | {key.ver_name: value.ver_name for key, value in defaults.items()}, ) ) module_body.append(ver.Statement(comment="Core")) module_body.append(always) # Consistent state var ordering in transpile state_vars = { key: ir.UInt(index) for index, key in enumerate(sorted(context.states)) } super().__init__( name=context.name, body=module_body, localparams=state_vars, ) self.inputs = self._input_lines(inputs=inputs, context=context) self.outputs = self._output_lines(outputs=outputs, context=context) @staticmethod def _input_lines(inputs: list[str], context: ir.Context): """ Module inputs """ input_lines = Lines() input_lines += ( "// Function parameters (only need to be set when start is high):" ) for input_ in inputs: assert isinstance(input_, str) input_lines += f"input wire signed [31:0] {input_}," input_lines.blank() input_lines += ( f"input wire {context.signals.clock.ver_name}, " "// clock for sync" ) input_lines += ( f"input wire {context.signals.reset.ver_name}, " "// set high to reset, i.e. done will be high" ) input_lines += ( f"input wire {context.signals.start}, " + "// set high to capture inputs (in same cycle) and start generating" ) input_lines.blank() input_lines += "// Implements the ready/valid handshake" input_lines += ( f"input wire {context.signals.ready}, " "// set high when caller is ready for output" ) return input_lines @staticmethod def _output_lines(outputs: list[str], context: ir.Context): """ Get outputs """ output_lines = Lines() output_lines += ( f"output reg {context.signals.valid}, " "// is high if output values are valid" ) output_lines.blank() output_lines += ( f"output reg {context.signals.done}, " "// is high if module done outputting" ) output_lines.blank() output_lines += "// Output values as a tuple with respective index(es)" for output in outputs: assert isinstance(output, str) output_lines += f"output reg signed [31:0] {output}," return output_lines
[docs] @staticmethod def make_start_ifelse(root: ver.Case, context: ir.Context) -> list[ver.Statement]: """ if (_start) begin ... end else begin ... end """ then_body: list[ver.Statement] = [] for var in context.input_vars: then_body.append( ver.NonBlockingSubsitution( ir.Var(py_name=var.ver_name, ver_name=var.ver_name), ir.Expression(var.py_name), ) ) if context.optimization_level > 0: # Optimization to include the entry state in the start ifelse # Map cached inputs to input signals (cached inputs not updated yet) mapping = { ir.Var(py_name=var.ver_name, ver_name=var.ver_name): ir.Expression( var.py_name ) for var in context.input_vars } # Get statements in entry state stmt_stack: list[ver.Statement] = [] for item in root.case_items: if item.condition == context.entry_state: stmt_stack += item.statements then_body += item.statements root.case_items.remove(item) break # Replace usage of cached inputs with input signals while stmt_stack: stmt = stmt_stack.pop() if isinstance(stmt, ver.NonBlockingSubsitution): stmt.rvalue = backwards_replace(stmt.rvalue, mapping) elif isinstance(stmt, ver.IfElse): stmt.condition = backwards_replace(stmt.condition, mapping) stmt_stack += stmt.then_body stmt_stack += stmt.else_body else: raise TypeError(f"Unexpected {type(stmt)} {stmt}") else: then_body.append( ver.NonBlockingSubsitution(context.state_var, context.entry_state) ) if_else = ver.IfElse( context.signals.start, then_body, [ ver.Statement( comment="If ready or not valid, then continue computation" ), ver.IfElse( ir.UBinOp( context.signals.ready, "||", ir.UnaryOp("!", context.signals.valid), ), [root], [], ), ], ) return [if_else]