""" Модуль DecisionEngine — классификация архитектурного риска и выбор стратегии модернизации. Реализует многокритериальный анализ на основе набора метрик, полученных от MetricsEngine и DependencyAnalyzer. Логика принятия решений: Каждый модуль получает «очки риска» по нескольким независимым критериям. Итоговый уровень риска определяется числом набранных очков, что обеспечивает интерпретируемость и прозрачность решения. Стратегии (по Lehman & Belady, 1985; Fowler, 2018): LOW / MEDIUM → KEEP / REFACTOR HIGH → REENGINEER CRITICAL → REPLACE """ import logging from typing import Dict, List, Tuple from config import AnalyzerConfig, DEFAULT_CONFIG from models import ( AnalysisReport, ModuleDecision, ModuleMetrics, ModernizationStrategy, RiskLevel, ) logger = logging.getLogger(__name__) class DecisionEngine: """ Классифицирует модули по уровню архитектурного риска и формирует рекомендации по стратегии модернизации. """ def __init__(self, config: AnalyzerConfig = DEFAULT_CONFIG): self.config = config self.t = config.thresholds # ------------------------------------------------------------------ # Публичный API # ------------------------------------------------------------------ def analyze( self, report: AnalysisReport, cycles: List[List[str]], ) -> AnalysisReport: """ Заполняет report.decisions и системные агрегаты. :param report: отчёт с заполненными modules :param cycles: список циклов зависимостей от DependencyAnalyzer :returns: обновлённый отчёт """ decisions: List[ModuleDecision] = [] for module_name, module in report.modules.items(): score, reasons = self._score_module(module) risk = self._score_to_risk(score) strategy = self._risk_to_strategy(risk) decision = ModuleDecision( module_name=module_name, risk_level=risk, strategy=strategy, reasons=reasons, priority=self._compute_priority(score, module), ) decisions.append(decision) # Сортируем по приоритету (меньше = важнее) decisions.sort(key=lambda d: d.priority) report.decisions = decisions report.dependency_cycles = cycles # Системные агрегаты report.total_files = sum(len(m.files) for m in report.modules.values()) report.total_nloc = sum(m.total_nloc for m in report.modules.values()) report.total_functions = sum(m.total_functions for m in report.modules.values()) all_ccn = [ f.ccn for m in report.modules.values() for f in m.functions ] report.avg_system_ccn = sum(all_ccn) / len(all_ccn) if all_ccn else 0.0 report.system_risk_level = self._system_risk(decisions) report.recommended_system_strategy = self._risk_to_strategy(report.system_risk_level) logger.info( "DecisionEngine: системный риск=%s, стратегия=%s", report.system_risk_level.value, report.recommended_system_strategy.name, ) return report # ------------------------------------------------------------------ # Оценка отдельного модуля # ------------------------------------------------------------------ def _score_module(self, module: ModuleMetrics) -> Tuple[int, List[str]]: """ Начисляет очки риска по каждому критерию. Возвращает (суммарный_счёт, список_причин). Каждый критерий даёт 1 (warn) или 2 (critical) очка. """ score = 0 reasons: List[str] = [] # --- Цикломатическая сложность --- if module.max_ccn > self.t.ccn_critical: score += 2 reasons.append( f"Критическая CCN: max={module.max_ccn} (порог {self.t.ccn_critical})" ) elif module.avg_ccn > self.t.ccn_warn: score += 1 reasons.append( f"Повышенная средняя CCN: avg={module.avg_ccn:.1f} (порог {self.t.ccn_warn})" ) # --- Доля сложных функций --- if module.heavy_functions_ratio > self.t.heavy_functions_ratio_critical: score += 2 reasons.append( f"Критическая доля сложных функций: " f"{module.heavy_functions_ratio:.0%} (порог {self.t.heavy_functions_ratio_critical:.0%})" ) elif module.heavy_functions_ratio > self.t.heavy_functions_ratio_warn: score += 1 reasons.append( f"Высокая доля сложных функций: " f"{module.heavy_functions_ratio:.0%} (порог {self.t.heavy_functions_ratio_warn:.0%})" ) # --- Исходящая связность (fan-out) --- if module.coupling_out > self.t.coupling_out_critical: score += 2 reasons.append( f"Критическая исходящая связность: C_out={module.coupling_out} " f"(порог {self.t.coupling_out_critical})" ) elif module.coupling_out > self.t.coupling_out_warn: score += 1 reasons.append( f"Высокая исходящая связность: C_out={module.coupling_out} " f"(порог {self.t.coupling_out_warn})" ) # --- Входящая связность (fan-in) --- if module.coupling_in > self.t.coupling_in_critical: score += 2 reasons.append( f"Критическая входящая связность: C_in={module.coupling_in} " f"(порог {self.t.coupling_in_critical}) — нестабильный центральный узел" ) elif module.coupling_in > self.t.coupling_in_warn: score += 1 reasons.append( f"Высокая входящая связность: C_in={module.coupling_in} " f"(порог {self.t.coupling_in_warn})" ) # --- Нестабильность (Instability, метрика Р. Мартина) --- if module.instability > self.t.instability_critical: score += 2 reasons.append( f"Критическая нестабильность: I={module.instability:.2f} " f"(порог {self.t.instability_critical})" ) elif module.instability > self.t.instability_warn: score += 1 reasons.append( f"Высокая нестабильность: I={module.instability:.2f} " f"(порог {self.t.instability_warn})" ) # --- Циклические зависимости --- if module.has_cycles: score += 3 reasons.append("Участвует в циклических зависимостях — признак архитектурной деградации") return score, reasons # ------------------------------------------------------------------ # Преобразование очков в решения # ------------------------------------------------------------------ @staticmethod def _score_to_risk(score: int) -> RiskLevel: """Преобразует суммарный счёт в уровень риска.""" if score == 0: return RiskLevel.LOW if score <= 2: return RiskLevel.MEDIUM if score <= 5: return RiskLevel.HIGH return RiskLevel.CRITICAL @staticmethod def _risk_to_strategy(risk: RiskLevel) -> ModernizationStrategy: """ Определяет стратегию модернизации по уровню риска. Логика основана на классификации Fowler (2018) и Lehman & Belady (1985). """ return { RiskLevel.LOW: ModernizationStrategy.KEEP, RiskLevel.MEDIUM: ModernizationStrategy.REFACTOR, RiskLevel.HIGH: ModernizationStrategy.REENGINEER, RiskLevel.CRITICAL: ModernizationStrategy.REPLACE, }[risk] @staticmethod def _compute_priority(score: int, module: ModuleMetrics) -> int: """ Приоритет в плане рефакторинга: меньше = выше приоритет. Модули с циклами и высоким C_in получают наивысший приоритет (они блокируют изменения во множестве других модулей). """ priority = 1000 - score * 100 # Модули с высоким fan-in важнее менять первыми — # они наиболее болезненные для всей системы if module.coupling_in > 0: priority -= module.coupling_in * 10 if module.has_cycles: priority -= 200 return max(priority, 1) @staticmethod def _system_risk(decisions: List[ModuleDecision]) -> RiskLevel: """Определяет системный уровень риска на основе распределения модулей.""" if not decisions: return RiskLevel.LOW total = len(decisions) critical_ratio = sum(1 for d in decisions if d.risk_level == RiskLevel.CRITICAL) / total high_ratio = sum(1 for d in decisions if d.risk_level in (RiskLevel.HIGH, RiskLevel.CRITICAL)) / total if critical_ratio > 0.3: return RiskLevel.CRITICAL if high_ratio > 0.5: return RiskLevel.HIGH if high_ratio > 0.2: return RiskLevel.MEDIUM return RiskLevel.LOW