arch-researcher/decision_engine.py

259 lines
11 KiB
Python
Raw Permalink Normal View History

2026-04-27 18:44:22 +00:00
"""
Модуль DecisionEngine классификация архитектурного риска и
выбор стратегии модернизации.
Реализует многокритериальный анализ на основе набора метрик,
полученных от MetricsEngine и DependencyAnalyzer.
Логика принятия решений:
Каждый модуль получает «очки риска» по нескольким независимым
критериям. Итоговый уровень риска определяется числом набранных
очков, что обеспечивает интерпретируемость и прозрачность решения.
Стратегии (по Lehman & Belady, 1985; Fowler, 2018):
LOW / MEDIUM KEEP / REFACTOR
HIGH REENGINEER
CRITICAL REPLACE
"""
import logging
from typing import Dict, List, Tuple
from config import AnalyzerConfig, DEFAULT_CONFIG
from models import (
AnalysisReport,
ModuleDecision,
ModuleMetrics,
ModernizationStrategy,
RiskLevel,
)
logger = logging.getLogger(__name__)
class DecisionEngine:
"""
Классифицирует модули по уровню архитектурного риска
и формирует рекомендации по стратегии модернизации.
"""
def __init__(self, config: AnalyzerConfig = DEFAULT_CONFIG):
self.config = config
self.t = config.thresholds
# ------------------------------------------------------------------
# Публичный API
# ------------------------------------------------------------------
def analyze(
self,
report: AnalysisReport,
cycles: List[List[str]],
) -> AnalysisReport:
"""
Заполняет report.decisions и системные агрегаты.
:param report: отчёт с заполненными modules
:param cycles: список циклов зависимостей от DependencyAnalyzer
:returns: обновлённый отчёт
"""
decisions: List[ModuleDecision] = []
for module_name, module in report.modules.items():
score, reasons = self._score_module(module)
risk = self._score_to_risk(score)
strategy = self._risk_to_strategy(risk)
decision = ModuleDecision(
module_name=module_name,
risk_level=risk,
strategy=strategy,
reasons=reasons,
priority=self._compute_priority(score, module),
)
decisions.append(decision)
# Сортируем по приоритету (меньше = важнее)
decisions.sort(key=lambda d: d.priority)
report.decisions = decisions
report.dependency_cycles = cycles
# Системные агрегаты
report.total_files = sum(len(m.files) for m in report.modules.values())
report.total_nloc = sum(m.total_nloc for m in report.modules.values())
report.total_functions = sum(m.total_functions for m in report.modules.values())
all_ccn = [
f.ccn
for m in report.modules.values()
for f in m.functions
]
report.avg_system_ccn = sum(all_ccn) / len(all_ccn) if all_ccn else 0.0
report.system_risk_level = self._system_risk(decisions)
report.recommended_system_strategy = self._risk_to_strategy(report.system_risk_level)
logger.info(
"DecisionEngine: системный риск=%s, стратегия=%s",
report.system_risk_level.value,
report.recommended_system_strategy.name,
)
return report
# ------------------------------------------------------------------
# Оценка отдельного модуля
# ------------------------------------------------------------------
def _score_module(self, module: ModuleMetrics) -> Tuple[int, List[str]]:
"""
Начисляет очки риска по каждому критерию.
Возвращает (суммарный_счёт, список_причин).
Каждый критерий даёт 1 (warn) или 2 (critical) очка.
"""
score = 0
reasons: List[str] = []
# --- Цикломатическая сложность ---
if module.max_ccn > self.t.ccn_critical:
score += 2
reasons.append(
f"Критическая CCN: max={module.max_ccn} (порог {self.t.ccn_critical})"
)
elif module.avg_ccn > self.t.ccn_warn:
score += 1
reasons.append(
f"Повышенная средняя CCN: avg={module.avg_ccn:.1f} (порог {self.t.ccn_warn})"
)
# --- Доля сложных функций ---
if module.heavy_functions_ratio > self.t.heavy_functions_ratio_critical:
score += 2
reasons.append(
f"Критическая доля сложных функций: "
f"{module.heavy_functions_ratio:.0%} (порог {self.t.heavy_functions_ratio_critical:.0%})"
)
elif module.heavy_functions_ratio > self.t.heavy_functions_ratio_warn:
score += 1
reasons.append(
f"Высокая доля сложных функций: "
f"{module.heavy_functions_ratio:.0%} (порог {self.t.heavy_functions_ratio_warn:.0%})"
)
# --- Исходящая связность (fan-out) ---
if module.coupling_out > self.t.coupling_out_critical:
score += 2
reasons.append(
f"Критическая исходящая связность: C_out={module.coupling_out} "
f"(порог {self.t.coupling_out_critical})"
)
elif module.coupling_out > self.t.coupling_out_warn:
score += 1
reasons.append(
f"Высокая исходящая связность: C_out={module.coupling_out} "
f"(порог {self.t.coupling_out_warn})"
)
# --- Входящая связность (fan-in) ---
if module.coupling_in > self.t.coupling_in_critical:
score += 2
reasons.append(
f"Критическая входящая связность: C_in={module.coupling_in} "
f"(порог {self.t.coupling_in_critical}) — нестабильный центральный узел"
)
elif module.coupling_in > self.t.coupling_in_warn:
score += 1
reasons.append(
f"Высокая входящая связность: C_in={module.coupling_in} "
f"(порог {self.t.coupling_in_warn})"
)
# --- Нестабильность (Instability, метрика Р. Мартина) ---
if module.instability > self.t.instability_critical:
score += 2
reasons.append(
f"Критическая нестабильность: I={module.instability:.2f} "
f"(порог {self.t.instability_critical})"
)
elif module.instability > self.t.instability_warn:
score += 1
reasons.append(
f"Высокая нестабильность: I={module.instability:.2f} "
f"(порог {self.t.instability_warn})"
)
# --- Циклические зависимости ---
if module.has_cycles:
score += 3
reasons.append("Участвует в циклических зависимостях — признак архитектурной деградации")
return score, reasons
# ------------------------------------------------------------------
# Преобразование очков в решения
# ------------------------------------------------------------------
@staticmethod
def _score_to_risk(score: int) -> RiskLevel:
"""Преобразует суммарный счёт в уровень риска."""
if score == 0:
return RiskLevel.LOW
if score <= 2:
return RiskLevel.MEDIUM
if score <= 5:
return RiskLevel.HIGH
return RiskLevel.CRITICAL
@staticmethod
def _risk_to_strategy(risk: RiskLevel) -> ModernizationStrategy:
"""
Определяет стратегию модернизации по уровню риска.
Логика основана на классификации Fowler (2018) и
Lehman & Belady (1985).
"""
return {
RiskLevel.LOW: ModernizationStrategy.KEEP,
RiskLevel.MEDIUM: ModernizationStrategy.REFACTOR,
RiskLevel.HIGH: ModernizationStrategy.REENGINEER,
RiskLevel.CRITICAL: ModernizationStrategy.REPLACE,
}[risk]
@staticmethod
def _compute_priority(score: int, module: ModuleMetrics) -> int:
"""
Приоритет в плане рефакторинга: меньше = выше приоритет.
Модули с циклами и высоким C_in получают наивысший приоритет
(они блокируют изменения во множестве других модулей).
"""
priority = 1000 - score * 100
# Модули с высоким fan-in важнее менять первыми —
# они наиболее болезненные для всей системы
if module.coupling_in > 0:
priority -= module.coupling_in * 10
if module.has_cycles:
priority -= 200
return max(priority, 1)
@staticmethod
def _system_risk(decisions: List[ModuleDecision]) -> RiskLevel:
"""Определяет системный уровень риска на основе распределения модулей."""
if not decisions:
return RiskLevel.LOW
total = len(decisions)
critical_ratio = sum(1 for d in decisions if d.risk_level == RiskLevel.CRITICAL) / total
high_ratio = sum(1 for d in decisions if d.risk_level in (RiskLevel.HIGH, RiskLevel.CRITICAL)) / total
if critical_ratio > 0.3:
return RiskLevel.CRITICAL
if high_ratio > 0.5:
return RiskLevel.HIGH
if high_ratio > 0.2:
return RiskLevel.MEDIUM
return RiskLevel.LOW