rv/rvgen: refactor automata.py to use iterator-based parsing

Refactor the DOT file parsing logic in automata.py to use Python's
iterator-based patterns instead of manual cursor indexing. The previous
implementation relied on while loops with explicit cursor management,
which made the code prone to off-by-one errors and would crash on
malformed input files containing empty lines.

The new implementation uses enumerate and itertools.islice to iterate
over lines, eliminating manual cursor tracking. Functions that search
for specific markers now use for loops with early returns and explicit
AutomataError exceptions for missing markers, rather than assuming the
markers exist. Additional bounds checking ensures that split line
arrays have sufficient elements before accessing specific indices,
preventing IndexError exceptions on malformed DOT files.

The matrix creation and event variable extraction methods now use
functional patterns with map combined with itertools.islice,
making the intent clearer while maintaining the same behavior. Minor
improvements include using extend instead of append in a loop, adding
empty file validation, and replacing enumerate with range where the
enumerated value was unused.

Signed-off-by: Wander Lairson Costa <wander@redhat.com>
Reviewed-by: Gabriele Monaco <gmonaco@redhat.com>
Link: https://lore.kernel.org/r/20260223162407.147003-12-wander@redhat.com
Signed-off-by: Gabriele Monaco <gmonaco@redhat.com>
This commit is contained in:
Wander Lairson Costa
2026-02-23 13:17:54 -03:00
committed by Gabriele Monaco
parent d474fedcc5
commit 0c25d8c8dc

View File

@@ -11,6 +11,7 @@
import ntpath
import re
from typing import Iterator
from itertools import islice
class _ConstraintKey:
"""Base class for constraint keys."""
@@ -89,37 +90,54 @@ class Automata:
return model_name
def __open_dot(self) -> list[str]:
cursor = 0
dot_lines = []
try:
with open(self.__dot_path) as dot_file:
dot_lines = dot_file.read().splitlines()
dot_lines = dot_file.readlines()
except OSError as exc:
raise AutomataError(exc.strerror) from exc
# checking the first line:
line = dot_lines[cursor].split()
if not dot_lines:
raise AutomataError(f"{self.__dot_path} is empty")
if (line[0] != "digraph") or (line[1] != "state_automaton"):
# checking the first line:
line = dot_lines[0].split()
if len(line) < 2 or line[0] != "digraph" or line[1] != "state_automaton":
raise AutomataError(f"Not a valid .dot format: {self.__dot_path}")
else:
cursor += 1
return dot_lines
def __get_cursor_begin_states(self) -> int:
cursor = 0
while self.__dot_lines[cursor].split()[0] != "{node":
cursor += 1
return cursor
for cursor, line in enumerate(self.__dot_lines):
split_line = line.split()
if len(split_line) and split_line[0] == "{node":
return cursor
raise AutomataError("Could not find a beginning state")
def __get_cursor_begin_events(self) -> int:
cursor = 0
while self.__dot_lines[cursor].split()[0] != "{node":
cursor += 1
while self.__dot_lines[cursor].split()[0] == "{node":
cursor += 1
# skip initial state transition
cursor += 1
state = 0
cursor = 0 # make pyright happy
for cursor, line in enumerate(self.__dot_lines):
line = line.split()
if not line:
continue
if state == 0:
if line[0] == "{node":
state = 1
elif line[0] != "{node":
break
else:
raise AutomataError("Could not find beginning event")
cursor += 1 # skip initial state transition
if cursor == len(self.__dot_lines):
raise AutomataError("Dot file ended after event beginning")
return cursor
def __get_state_variables(self) -> tuple[list[str], str, list[str]]:
@@ -131,9 +149,12 @@ class Automata:
cursor = self.__get_cursor_begin_states()
# process nodes
while self.__dot_lines[cursor].split()[0] == "{node":
line = self.__dot_lines[cursor].split()
raw_state = line[-1]
for line in islice(self.__dot_lines, cursor, None):
split_line = line.split()
if not split_line or split_line[0] != "{node":
break
raw_state = split_line[-1]
# "enabled_fired"}; -> enabled_fired
state = raw_state.replace('"', '').replace('};', '').replace(',', '_')
@@ -141,16 +162,14 @@ class Automata:
initial_state = state[len(self.init_marker):]
else:
states.append(state)
if "doublecircle" in self.__dot_lines[cursor]:
if "doublecircle" in line:
final_states.append(state)
has_final_states = True
if "ellipse" in self.__dot_lines[cursor]:
if "ellipse" in line:
final_states.append(state)
has_final_states = True
cursor += 1
states = sorted(set(states))
states.remove(initial_state)
@@ -163,18 +182,21 @@ class Automata:
return states, initial_state, final_states
def __get_event_variables(self) -> tuple[list[str], list[str]]:
events: list[str] = []
envs: list[str] = []
# here we are at the begin of transitions, take a note, we will return later.
cursor = self.__get_cursor_begin_events()
events = []
envs = []
while self.__dot_lines[cursor].lstrip()[0] == '"':
for line in map(str.lstrip, islice(self.__dot_lines, cursor, None)):
if not line.startswith('"'):
break
# transitions have the format:
# "all_fired" -> "both_fired" [ label = "disable_irq" ];
# ------------ event is here ------------^^^^^
if self.__dot_lines[cursor].split()[1] == "->":
line = self.__dot_lines[cursor].split()
event = "".join(line[line.index("label") + 2:-1]).replace('"', '')
split_line = line.split()
if len(split_line) > 1 and split_line[1] == "->":
event = "".join(split_line[split_line.index("label") + 2:-1]).replace('"', '')
# when a transition has more than one label, they are like this
# "local_irq_enable\nhw_local_irq_enable_n"
@@ -187,7 +209,7 @@ class Automata:
ev, *constr = i.split(";")
if constr:
if len(constr) > 2:
raise ValueError("Only 1 constraint and 1 reset are supported")
raise AutomataError("Only 1 constraint and 1 reset are supported")
envs += self.__extract_env_var(constr)
events.append(ev)
else:
@@ -195,13 +217,12 @@ class Automata:
# "enable_fired" [label = "enable_fired\ncondition"];
# ----- label is here -----^^^^^
# label and node name must be the same, condition is optional
state = self.__dot_lines[cursor].split("label")[1].split('"')[1]
state = line.split("label")[1].split('"')[1]
_, *constr = state.split("\\n")
if constr:
if len(constr) > 1:
raise ValueError("Only 1 constraint is supported in the state")
raise AutomataError("Only 1 constraint is supported in the state")
envs += self.__extract_env_var([constr[0].replace(" ", "")])
cursor += 1
return sorted(set(events)), sorted(set(envs))
@@ -265,18 +286,24 @@ class Automata:
nr_state += 1
# declare the matrix....
matrix = [[self.invalid_state_str for x in range(nr_event)] for y in range(nr_state)]
matrix = [[self.invalid_state_str for _ in range(nr_event)] for _ in range(nr_state)]
constraints: dict[_ConstraintKey, list[str]] = {}
# and we are back! Let's fill the matrix
cursor = self.__get_cursor_begin_events()
while self.__dot_lines[cursor].lstrip()[0] == '"':
if self.__dot_lines[cursor].split()[1] == "->":
line = self.__dot_lines[cursor].split()
origin_state = line[0].replace('"', '').replace(',', '_')
dest_state = line[2].replace('"', '').replace(',', '_')
possible_events = "".join(line[line.index("label") + 2:-1]).replace('"', '')
for line in map(str.lstrip,
islice(self.__dot_lines, cursor, None)):
if not line or line[0] != '"':
break
split_line = line.split()
if len(split_line) > 2 and split_line[1] == "->":
origin_state = split_line[0].replace('"', '').replace(',', '_')
dest_state = split_line[2].replace('"', '').replace(',', '_')
possible_events = "".join(split_line[split_line.index("label") + 2:-1]).replace('"', '')
for event in possible_events.split("\\n"):
event, *constr = event.split(";")
if constr:
@@ -287,22 +314,21 @@ class Automata:
self.self_loop_reset_events.add(event)
matrix[states_dict[origin_state]][events_dict[event]] = dest_state
else:
state = self.__dot_lines[cursor].split("label")[1].split('"')[1]
state = line.split("label")[1].split('"')[1]
state, *constr = state.replace(" ", "").split("\\n")
if constr:
constraints[_StateConstraintKey(states_dict[state])] = constr
cursor += 1
return matrix, constraints
def __store_init_events(self) -> tuple[list[bool], list[bool]]:
events_start = [False] * len(self.events)
events_start_run = [False] * len(self.events)
for i, _ in enumerate(self.events):
for i in range(len(self.events)):
curr_event_will_init = 0
curr_event_from_init = False
curr_event_used = 0
for j, _ in enumerate(self.states):
for j in range(len(self.states)):
if self.function[j][i] != self.invalid_state_str:
curr_event_used += 1
if self.function[j][i] == self.initial_state: