Source code for rtfstruct.diagnostics
# SPDX-License-Identifier: Apache-2.0
# Copyright 2026 Lee Powell
"""Diagnostics for parser, exporter, and writer recovery.
This module owns machine-readable diagnostic records and capped diagnostic
collection. It does not decide parser recovery policy; parser and writer modules
create diagnostics when they recover from malformed or unsupported input.
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import StrEnum
[docs]
class Severity(StrEnum):
"""Diagnostic severity levels."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
FATAL = "fatal"
[docs]
@dataclass(slots=True)
class Diagnostic:
"""Machine-readable parser, exporter, or writer diagnostic.
Attributes:
code: Stable diagnostic code.
message: Human-readable explanation.
severity: Severity of the condition.
offset: Optional byte or character offset where the condition occurred.
control_word: Optional RTF control word associated with the diagnostic.
destination: Optional RTF destination associated with the diagnostic.
"""
code: str
message: str
severity: Severity
offset: int | None = None
control_word: str | None = None
destination: str | None = None
[docs]
class Diagnostics:
"""Capped and deduplicated diagnostic collector.
The collector records the first few diagnostics for each code and suppresses
repeated duplicates. Suppression summaries can be emitted at the end of a
parse so repeated defects do not overwhelm downstream tooling.
"""
def __init__(self, max_diagnostics: int = 10_000, per_code_limit: int = 20) -> None:
"""Create a diagnostics collector.
Args:
max_diagnostics: Maximum diagnostics to retain.
per_code_limit: Maximum diagnostics retained for any single code.
"""
self._max_diagnostics = max_diagnostics
self._per_code_limit = per_code_limit
self._items: list[Diagnostic] = []
self._seen_by_code: dict[str, int] = {}
self._suppressed_by_code: dict[str, int] = {}
@property
def items(self) -> list[Diagnostic]:
"""Return retained diagnostics in insertion order."""
return self._items
[docs]
def add(
self,
code: str,
message: str,
severity: Severity = Severity.WARNING,
*,
offset: int | None = None,
control_word: str | None = None,
destination: str | None = None,
) -> None:
"""Add a diagnostic if collection limits allow it.
Args:
code: Stable diagnostic code.
message: Human-readable diagnostic text.
severity: Severity level.
offset: Optional source offset.
control_word: Optional related control word.
destination: Optional active destination.
"""
if len(self._items) >= self._max_diagnostics:
self._suppressed_by_code[code] = self._suppressed_by_code.get(code, 0) + 1
return
count = self._seen_by_code.get(code, 0)
self._seen_by_code[code] = count + 1
if count >= self._per_code_limit:
self._suppressed_by_code[code] = self._suppressed_by_code.get(code, 0) + 1
return
self._items.append(
Diagnostic(
code=code,
message=message,
severity=severity,
offset=offset,
control_word=control_word,
destination=destination,
)
)
[docs]
def finalize(self) -> list[Diagnostic]:
"""Append suppression summaries and return retained diagnostics."""
for code, count in sorted(self._suppressed_by_code.items()):
if len(self._items) >= self._max_diagnostics:
break
self._items.append(
Diagnostic(
code=f"{code}_SUPPRESSED",
message=f"Suppressed {count} further occurrences of {code}.",
severity=Severity.WARNING,
)
)
return self._items