arch-researcher/decision_engine.py
2026-04-27 23:44:22 +05:00

259 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Модуль DecisionEngine — классификация архитектурного риска и
выбор стратегии модернизации.
Реализует многокритериальный анализ на основе набора метрик,
полученных от MetricsEngine и DependencyAnalyzer.
Логика принятия решений:
Каждый модуль получает «очки риска» по нескольким независимым
критериям. Итоговый уровень риска определяется числом набранных
очков, что обеспечивает интерпретируемость и прозрачность решения.
Стратегии (по Lehman & Belady, 1985; Fowler, 2018):
LOW / MEDIUM → KEEP / REFACTOR
HIGH → REENGINEER
CRITICAL → REPLACE
"""
import logging
from typing import Dict, List, Tuple
from config import AnalyzerConfig, DEFAULT_CONFIG
from models import (
AnalysisReport,
ModuleDecision,
ModuleMetrics,
ModernizationStrategy,
RiskLevel,
)
logger = logging.getLogger(__name__)
class DecisionEngine:
"""
Классифицирует модули по уровню архитектурного риска
и формирует рекомендации по стратегии модернизации.
"""
def __init__(self, config: AnalyzerConfig = DEFAULT_CONFIG):
self.config = config
self.t = config.thresholds
# ------------------------------------------------------------------
# Публичный API
# ------------------------------------------------------------------
def analyze(
self,
report: AnalysisReport,
cycles: List[List[str]],
) -> AnalysisReport:
"""
Заполняет report.decisions и системные агрегаты.
:param report: отчёт с заполненными modules
:param cycles: список циклов зависимостей от DependencyAnalyzer
:returns: обновлённый отчёт
"""
decisions: List[ModuleDecision] = []
for module_name, module in report.modules.items():
score, reasons = self._score_module(module)
risk = self._score_to_risk(score)
strategy = self._risk_to_strategy(risk)
decision = ModuleDecision(
module_name=module_name,
risk_level=risk,
strategy=strategy,
reasons=reasons,
priority=self._compute_priority(score, module),
)
decisions.append(decision)
# Сортируем по приоритету (меньше = важнее)
decisions.sort(key=lambda d: d.priority)
report.decisions = decisions
report.dependency_cycles = cycles
# Системные агрегаты
report.total_files = sum(len(m.files) for m in report.modules.values())
report.total_nloc = sum(m.total_nloc for m in report.modules.values())
report.total_functions = sum(m.total_functions for m in report.modules.values())
all_ccn = [
f.ccn
for m in report.modules.values()
for f in m.functions
]
report.avg_system_ccn = sum(all_ccn) / len(all_ccn) if all_ccn else 0.0
report.system_risk_level = self._system_risk(decisions)
report.recommended_system_strategy = self._risk_to_strategy(report.system_risk_level)
logger.info(
"DecisionEngine: системный риск=%s, стратегия=%s",
report.system_risk_level.value,
report.recommended_system_strategy.name,
)
return report
# ------------------------------------------------------------------
# Оценка отдельного модуля
# ------------------------------------------------------------------
def _score_module(self, module: ModuleMetrics) -> Tuple[int, List[str]]:
"""
Начисляет очки риска по каждому критерию.
Возвращает (суммарный_счёт, список_причин).
Каждый критерий даёт 1 (warn) или 2 (critical) очка.
"""
score = 0
reasons: List[str] = []
# --- Цикломатическая сложность ---
if module.max_ccn > self.t.ccn_critical:
score += 2
reasons.append(
f"Критическая CCN: max={module.max_ccn} (порог {self.t.ccn_critical})"
)
elif module.avg_ccn > self.t.ccn_warn:
score += 1
reasons.append(
f"Повышенная средняя CCN: avg={module.avg_ccn:.1f} (порог {self.t.ccn_warn})"
)
# --- Доля сложных функций ---
if module.heavy_functions_ratio > self.t.heavy_functions_ratio_critical:
score += 2
reasons.append(
f"Критическая доля сложных функций: "
f"{module.heavy_functions_ratio:.0%} (порог {self.t.heavy_functions_ratio_critical:.0%})"
)
elif module.heavy_functions_ratio > self.t.heavy_functions_ratio_warn:
score += 1
reasons.append(
f"Высокая доля сложных функций: "
f"{module.heavy_functions_ratio:.0%} (порог {self.t.heavy_functions_ratio_warn:.0%})"
)
# --- Исходящая связность (fan-out) ---
if module.coupling_out > self.t.coupling_out_critical:
score += 2
reasons.append(
f"Критическая исходящая связность: C_out={module.coupling_out} "
f"(порог {self.t.coupling_out_critical})"
)
elif module.coupling_out > self.t.coupling_out_warn:
score += 1
reasons.append(
f"Высокая исходящая связность: C_out={module.coupling_out} "
f"(порог {self.t.coupling_out_warn})"
)
# --- Входящая связность (fan-in) ---
if module.coupling_in > self.t.coupling_in_critical:
score += 2
reasons.append(
f"Критическая входящая связность: C_in={module.coupling_in} "
f"(порог {self.t.coupling_in_critical}) — нестабильный центральный узел"
)
elif module.coupling_in > self.t.coupling_in_warn:
score += 1
reasons.append(
f"Высокая входящая связность: C_in={module.coupling_in} "
f"(порог {self.t.coupling_in_warn})"
)
# --- Нестабильность (Instability, метрика Р. Мартина) ---
if module.instability > self.t.instability_critical:
score += 2
reasons.append(
f"Критическая нестабильность: I={module.instability:.2f} "
f"(порог {self.t.instability_critical})"
)
elif module.instability > self.t.instability_warn:
score += 1
reasons.append(
f"Высокая нестабильность: I={module.instability:.2f} "
f"(порог {self.t.instability_warn})"
)
# --- Циклические зависимости ---
if module.has_cycles:
score += 3
reasons.append("Участвует в циклических зависимостях — признак архитектурной деградации")
return score, reasons
# ------------------------------------------------------------------
# Преобразование очков в решения
# ------------------------------------------------------------------
@staticmethod
def _score_to_risk(score: int) -> RiskLevel:
"""Преобразует суммарный счёт в уровень риска."""
if score == 0:
return RiskLevel.LOW
if score <= 2:
return RiskLevel.MEDIUM
if score <= 5:
return RiskLevel.HIGH
return RiskLevel.CRITICAL
@staticmethod
def _risk_to_strategy(risk: RiskLevel) -> ModernizationStrategy:
"""
Определяет стратегию модернизации по уровню риска.
Логика основана на классификации Fowler (2018) и
Lehman & Belady (1985).
"""
return {
RiskLevel.LOW: ModernizationStrategy.KEEP,
RiskLevel.MEDIUM: ModernizationStrategy.REFACTOR,
RiskLevel.HIGH: ModernizationStrategy.REENGINEER,
RiskLevel.CRITICAL: ModernizationStrategy.REPLACE,
}[risk]
@staticmethod
def _compute_priority(score: int, module: ModuleMetrics) -> int:
"""
Приоритет в плане рефакторинга: меньше = выше приоритет.
Модули с циклами и высоким C_in получают наивысший приоритет
(они блокируют изменения во множестве других модулей).
"""
priority = 1000 - score * 100
# Модули с высоким fan-in важнее менять первыми —
# они наиболее болезненные для всей системы
if module.coupling_in > 0:
priority -= module.coupling_in * 10
if module.has_cycles:
priority -= 200
return max(priority, 1)
@staticmethod
def _system_risk(decisions: List[ModuleDecision]) -> RiskLevel:
"""Определяет системный уровень риска на основе распределения модулей."""
if not decisions:
return RiskLevel.LOW
total = len(decisions)
critical_ratio = sum(1 for d in decisions if d.risk_level == RiskLevel.CRITICAL) / total
high_ratio = sum(1 for d in decisions if d.risk_level in (RiskLevel.HIGH, RiskLevel.CRITICAL)) / total
if critical_ratio > 0.3:
return RiskLevel.CRITICAL
if high_ratio > 0.5:
return RiskLevel.HIGH
if high_ratio > 0.2:
return RiskLevel.MEDIUM
return RiskLevel.LOW