Source code for min_ratio_cycle.solver

"""
Enhanced Min Ratio Cycle Solver with improved features and robustness.

This module provides a comprehensive implementation of minimum cost-to-time ratio
cycle detection with the following enhancements:
- Better error handling and validation
- Performance optimizations for sparse graphs
- Structured logging and metrics
- Configuration management
- Multiple solving modes and options
- Comprehensive debugging utilities
"""

from __future__ import annotations

import logging
import math
import time
from dataclasses import dataclass, field
from enum import Enum
from fractions import Fraction
from typing import Any

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import psutil

from min_ratio_cycle.exceptions import (
    ConvergenceError,
    NumericalInstabilityError,
    ResourceExhaustionError,
    TimeoutError,
)

# Version and metadata
__version__ = "0.2.0"
__author__ = "Enhanced by Claude"


# ==============================
# Configuration and Types
# ==============================


[docs] class SolverMode(Enum): """ Solver mode enumeration. """ AUTO = "auto" # Automatic selection based on weight types EXACT = "exact" # Force exact rational arithmetic NUMERIC = "numeric" # Force floating-point arithmetic APPROXIMATE = "approximate" # Fast approximation
[docs] class LogLevel(Enum): """ Logging level enumeration. """ NONE = 0 ERROR = 1 WARN = 2 INFO = 3 DEBUG = 4
[docs] @dataclass class SolverConfig: """ Configuration for the solver. """ # Numeric mode parameters numeric_max_iter: int = 60 numeric_tolerance: float = 1e-12 numeric_cycle_slack: float = 1e-15 # Exact mode parameters exact_max_denominator: int | None = None exact_max_steps: int | None = None # Performance parameters sparse_threshold: float = 0.1 # Use sparse optimizations below this density enable_preprocessing: bool = True enable_early_termination: bool = True # Validation parameters validate_cycles: bool = True repair_cycles: bool = True # Logging and monitoring log_level: LogLevel = LogLevel.INFO collect_metrics: bool = True # Advanced options use_kahan_summation: bool = True # For numerical stability max_solve_time: float | None = None # Maximum time in seconds max_memory_mb: int | None = None # Maximum memory usage
[docs] @dataclass(frozen=True) class Edge: """ Directed edge with cost and (strictly positive) transit time. For exact solver, both cost and time should be integers or rational numbers. For numeric solver, floats are acceptable. """ u: int v: int cost: float | int | Fraction time: float | int | Fraction def __post_init__(self): """ Validate edge parameters. """ if not isinstance(self.u, int) or not isinstance(self.v, int): raise ValueError("Vertex indices must be integers") if self.u < 0 or self.v < 0: raise ValueError("Vertex indices must be non-negative") # Convert time to appropriate type and validate time_val = ( float(self.time) if not isinstance(self.time, Fraction) else self.time ) if (isinstance(time_val, (int, float)) and time_val <= 0) or ( isinstance(time_val, Fraction) and time_val <= 0 ): raise ValueError("Edge transit time must be strictly positive")
[docs] @dataclass class SolverMetrics: """ Metrics collected during solving. """ solve_time: float = 0.0 iterations: int = 0 mode_used: str = "" preprocessing_time: float = 0.0 graph_properties: dict[str, Any] = field(default_factory=dict) memory_peak: int | None = None
[docs] @dataclass class SolverResult: """ Result from solver with additional metadata. """ cycle: list[int] sum_cost: float | Fraction sum_time: float | Fraction ratio: float | Fraction success: bool = True error_message: str = "" metrics: SolverMetrics | None = None config_used: SolverConfig | None = None def __iter__(self): """ Allow tuple unpacking of the primary result fields. """ yield self.cycle yield self.sum_cost yield self.sum_time yield self.ratio
# ============================== # Enhanced Solver Implementation # ==============================
[docs] class MinRatioCycleSolver: """ Enhanced minimum cost-to-time ratio cycle solver. Features: - Automatic mode selection (exact vs numeric) - Sparse graph optimizations - Comprehensive error handling - Performance monitoring - Configurable behavior - Debugging utilities """ def __init__(self, n_vertices: int, config: SolverConfig | None = None): """ Initialize solver. Args: n_vertices: Number of vertices in the graph config: Optional configuration object """ if not isinstance(n_vertices, int) or n_vertices <= 0: raise ValueError("n_vertices must be a positive integer") self.n = n_vertices self.config = config or SolverConfig() self._edges: list[Edge] = [] # Setup logging self._setup_logging() # NumPy arrays (built lazily) self._arrays_built = False self._U: np.ndarray | None = None self._V: np.ndarray | None = None self._C: np.ndarray | None = None # float64 costs self._T: np.ndarray | None = None # float64 times self._Ci: np.ndarray | None = None # int64 costs (exact mode) self._Ti: np.ndarray | None = None # int64 times (exact mode) self._starts: np.ndarray | None = None # CSR format self._counts: np.ndarray | None = None # CSR format # State tracking self._all_int = True # Whether all weights are integers self._is_sparse = False self._last_result: SolverResult | None = None # Metrics self._metrics = SolverMetrics() if self.config.collect_metrics else None self.logger.info(f"Initialized solver with {n_vertices} vertices") def _build_numpy_arrays_once(self) -> None: """ Backward compatibility shim for tests expecting this method. """ self._build_arrays_if_needed() def _setup_logging(self) -> None: """ Setup structured logging. """ self.logger = logging.getLogger(f"{__name__}.{id(self)}") if self.config.log_level == LogLevel.NONE: self.logger.setLevel(logging.CRITICAL + 1) elif self.config.log_level == LogLevel.ERROR: self.logger.setLevel(logging.ERROR) elif self.config.log_level == LogLevel.WARN: self.logger.setLevel(logging.WARNING) elif self.config.log_level == LogLevel.INFO: self.logger.setLevel(logging.INFO) else: # DEBUG self.logger.setLevel(logging.DEBUG) if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) handler.setFormatter(formatter) self.logger.addHandler(handler) # ============================== # Edge Management # ==============================
[docs] def add_edge( self, u: int, v: int, cost: float | int | Fraction, time: float | int | Fraction, ) -> None: """ Add a directed edge u -> v with given cost and time. Args: u, v: Vertex indices (must be in [0, n_vertices)) cost: Edge cost (can be negative) time: Edge time (must be positive) """ if not (0 <= u < self.n and 0 <= v < self.n): raise ValueError("u and v must be valid vertex indices") # Create edge (validation happens in Edge.__post_init__) edge = Edge(u, v, cost, time) # Update integer tracking if self._all_int: cost_is_int = isinstance(cost, (int, Fraction)) or ( isinstance(cost, float) and cost.is_integer() ) time_is_int = isinstance(time, (int, Fraction)) or ( isinstance(time, float) and time.is_integer() ) if not (cost_is_int and time_is_int): self._all_int = False self.logger.debug("Switched to numeric mode due to non-integer weights") self._edges.append(edge) self._arrays_built = False # Invalidate cached arrays self.logger.debug(f"Added edge {u} -> {v} (cost={cost}, time={time})")
[docs] def add_edges(self, edges: list[tuple[int, int, float | int, float | int]]) -> None: """ Add multiple edges at once for efficiency. """ for u, v, cost, time in edges: self.add_edge(u, v, cost, time)
[docs] def remove_edge(self, u: int, v: int) -> bool: """ Remove an edge between vertices u and v. Returns: True if edge was removed, False if not found """ for i, edge in enumerate(self._edges): if edge.u == u and edge.v == v: del self._edges[i] self._arrays_built = False self.logger.debug(f"Removed edge {u} -> {v}") return True return False
[docs] def clear_edges(self) -> None: """ Remove all edges from the graph. """ self._edges.clear() self._arrays_built = False self.logger.debug("Cleared all edges")
# ============================== # Graph Analysis # ==============================
[docs] def get_graph_properties(self) -> dict[str, Any]: """ Get comprehensive graph properties. """ if not self._edges: return {"error": "No edges in graph"} self._build_arrays_if_needed() if self._U is None: return {"error": "Failed to build arrays"} n, m = self.n, len(self._edges) density = m / (n * n) # Degree analysis in_degrees = np.bincount(self._V, minlength=n) out_degrees = np.bincount(self._U, minlength=n) # Weight analysis costs = [float(e.cost) for e in self._edges] times = [float(e.time) for e in self._edges] return { "vertices": n, "edges": m, "density": density, "is_sparse": density < self.config.sparse_threshold, "connectivity": { "isolated_vertices": int( np.sum((in_degrees == 0) & (out_degrees == 0)) ), "source_vertices": int(np.sum((in_degrees == 0) & (out_degrees > 0))), "sink_vertices": int(np.sum((in_degrees > 0) & (out_degrees == 0))), "avg_in_degree": float(np.mean(in_degrees)), "avg_out_degree": float(np.mean(out_degrees)), "max_in_degree": int(np.max(in_degrees)), "max_out_degree": int(np.max(out_degrees)), }, "weights": { "all_integer": self._all_int, "cost_range": (min(costs), max(costs)), "time_range": (min(times), max(times)), "cost_mean": sum(costs) / len(costs), "time_mean": sum(times) / len(times), "cost_std": np.std(costs), "time_std": np.std(times), }, }
[docs] def detect_issues(self) -> list[dict[str, str]]: """ Detect potential issues that might cause solve failures. """ issues = [] props = self.get_graph_properties() if "error" in props: issues.append({"level": "ERROR", "message": props["error"]}) return issues # Check connectivity conn = props["connectivity"] if conn["isolated_vertices"] > 0: issues.append( { "level": "WARNING", "message": f"{conn['isolated_vertices']} isolated vertices (no cycles possible through them)", } ) # Check for very sparse graphs if props["density"] < 0.01: issues.append( { "level": "INFO", "message": f"Very sparse graph (density: {props['density']:.4f})", } ) # Check weight ranges weights = props["weights"] cost_range = weights["cost_range"][1] - weights["cost_range"][0] time_ratio = ( weights["time_range"][1] / weights["time_range"][0] if weights["time_range"][0] > 0 else float("inf") ) if cost_range > 1e10: issues.append( { "level": "WARNING", "message": f"Very large cost range may cause numerical issues: {cost_range:.2e}", } ) if time_ratio > 1e6: issues.append( { "level": "WARNING", "message": f"Very large time ratio may cause numerical issues: {time_ratio:.2e}", } ) return issues
# ============================== # Array Building and Preprocessing # ============================== def _build_arrays_if_needed(self) -> None: """ Build NumPy arrays if not already built. """ if self._arrays_built: return start_time = time.time() if self._metrics else 0 m = len(self._edges) if m == 0: self.logger.warning("No edges to build arrays from") return # Build basic arrays U = np.empty(m, dtype=np.int64) V = np.empty(m, dtype=np.int64) C = np.empty(m, dtype=np.float64) T = np.empty(m, dtype=np.float64) for i, edge in enumerate(self._edges): U[i] = edge.u V[i] = edge.v C[i] = float(edge.cost) T[i] = float(edge.time) # Sort by destination for CSR format order = np.argsort(V, kind="stable") U, V, C, T = U[order], V[order], C[order], T[order] # Build CSR structure counts = np.bincount(V, minlength=self.n) starts = np.empty(self.n, dtype=np.int64) starts[0] = 0 if self.n > 1: np.cumsum(counts[:-1], out=starts[1:]) # Store arrays self._U, self._V, self._C, self._T = U, V, C, T self._starts, self._counts = starts, counts # Build integer arrays for exact mode if needed if self._all_int: self._Ci = np.array( [int(edge.cost) for edge in self._edges], dtype=np.int64 )[order] self._Ti = np.array( [int(edge.time) for edge in self._edges], dtype=np.int64 )[order] # Check if sparse density = m / (self.n * self.n) self._is_sparse = density < self.config.sparse_threshold self._arrays_built = True if self._metrics: self._metrics.preprocessing_time = time.time() - start_time self.logger.debug( f"Built arrays: {m} edges, density={density:.4f}, sparse={self._is_sparse}" ) def _preprocess_graph(self) -> None: """ Apply graph preprocessing optimizations. """ if not self.config.enable_preprocessing: return original_edges = len(self._edges) # Remove dominated edges (same endpoints, one dominates in both cost and time) if self.config.enable_preprocessing: self._remove_dominated_edges() removed = original_edges - len(self._edges) if removed > 0: self.logger.info(f"Preprocessing removed {removed} dominated edges") self._arrays_built = False # Need to rebuild arrays def _remove_dominated_edges(self) -> None: """ Remove edges dominated by others with same endpoints. """ edge_groups: dict[tuple[int, int], list[int]] = {} # Group edges by (u, v) pairs for i, edge in enumerate(self._edges): key = (edge.u, edge.v) if key not in edge_groups: edge_groups[key] = [] edge_groups[key].append(i) indices_to_remove = set() # For each group, find dominated edges for indices in edge_groups.values(): if len(indices) <= 1: continue edges = [self._edges[i] for i in indices] for i in range(len(edges)): for j in range(len(edges)): if i == j: continue edge_i, edge_j = edges[i], edges[j] # Check if edge_i dominates edge_j if ( float(edge_i.cost) <= float(edge_j.cost) and float(edge_i.time) <= float(edge_j.time) and ( float(edge_i.cost) < float(edge_j.cost) or float(edge_i.time) < float(edge_j.time) ) ): indices_to_remove.add(indices[j]) # Remove dominated edges if indices_to_remove: self._edges = [ edge for i, edge in enumerate(self._edges) if i not in indices_to_remove ] # ============================== # Main Solve Method # ==============================
[docs] def solve( self, mode: SolverMode | str = SolverMode.AUTO, target_ratio: float | None = None, **kwargs, ) -> SolverResult: """ Find minimum cost-to-time ratio cycle. Args: mode: Solving mode (auto, exact, numeric, approximate) target_ratio: Stop if better ratio found (early termination) **kwargs: Additional parameters passed to specific solvers Returns: SolverResult with cycle, costs, and metadata """ if isinstance(mode, str): mode = SolverMode(mode) start_time = time.time() if self.config.max_memory_mb is not None: mem = psutil.Process().memory_info().rss / (1024 * 1024) if mem > self.config.max_memory_mb: raise ResourceExhaustionError( "Memory limit exceeded before solve", resource="memory", limit=self.config.max_memory_mb, usage=mem, suggested_fix="increase max_memory_mb", ) try: # Preprocessing self._preprocess_graph() self._build_arrays_if_needed() if not self._edges: raise ValueError("Graph has no edges") # Use numeric solver for all cases actual_mode = SolverMode.NUMERIC self.logger.info(f"Starting solve in {actual_mode.value} mode") try: result = self._solve_numeric(target_ratio=target_ratio, **kwargs) except (NumericalInstabilityError, ConvergenceError) as e: self.logger.warning("Numeric solve failed: %s", e) if self._all_int: self.logger.info("Retrying with exact mode") result = self._solve_exact(**kwargs) else: self.logger.info("Relaxing tolerance and retrying") old_tol = self.config.numeric_tolerance self.config.numeric_tolerance *= 10 result = self._solve_numeric(target_ratio=target_ratio, **kwargs) self.config.numeric_tolerance = old_tol if not result.success: raise NumericalInstabilityError( "Solver failed in all recovery attempts", component="solve", suggested_fix="check input weights", recovery_hint="try exact mode or scale weights", ) from e # Validate result if requested if self.config.validate_cycles and result.success: result = self._validate_result(result) # Update metrics solve_time = time.time() - start_time if self._metrics: self._metrics.solve_time = solve_time self._metrics.mode_used = actual_mode.value self._metrics.graph_properties = self.get_graph_properties() result.metrics = self._metrics result.config_used = self.config self._last_result = result if result.success: self.logger.info( f"Solved successfully in {solve_time:.4f}s, ratio={result.ratio}" ) return result else: self.logger.error(f"Solve failed: {result.error_message}") raise RuntimeError(result.error_message) except TimeoutError as e: self.logger.error("Timeout: %s", e) raise except ResourceExhaustionError as e: self.logger.error("Resource limit hit: %s", e) raise except Exception as e: self.logger.error("Solver exception: %s", e) raise
# ============================== # Exact Mode Implementation # ============================== def _solve_exact(self, **kwargs) -> SolverResult: """ Solve using exact rational arithmetic (Stern-Brocot search). """ if not self._all_int: return SolverResult( cycle=[], sum_cost=0, sum_time=0, ratio=float("inf"), success=False, error_message="Exact mode requires integer weights", ) if self._Ci is None or self._Ti is None: raise RuntimeError( "Integer weight arrays (_Ci, _Ti) must be initialized before solving" ) max_den = kwargs.get("max_denominator", self.config.exact_max_denominator) max_steps = kwargs.get("max_steps", self.config.exact_max_steps) try: cycle, sum_c, sum_t, (a, b), ratio_float = self._stern_brocot_search( max_den, max_steps ) # Close cycle if needed if cycle and (len(cycle) == 1 or cycle[0] != cycle[-1]): cycle.append(cycle[0]) return SolverResult( cycle=cycle, sum_cost=float(sum_c), sum_time=float(sum_t), ratio=float(sum_c) / float(sum_t), success=True, ) except Exception as e: return SolverResult( cycle=[], sum_cost=0, sum_time=0, ratio=float("inf"), success=False, error_message=f"Exact solver failed: {str(e)}", ) def _stern_brocot_search( self, max_den: int | None, max_steps: int | None ) -> tuple[list[int], int, int, tuple[int, int], float]: """ Implement Stern-Brocot tree search for exact optimal ratio. """ if self._Ci is None or self._Ti is None: raise RuntimeError( "Integer weight arrays (_Ci, _Ti) must be initialized before searching" ) # Set defaults if max_den is None: t_max = int(np.max(self._Ti)) max_den = self.n * max(1, t_max) if max_steps is None: max_steps = 2 * max_den + 10 # Initial bracket ratios = self._Ci.astype(np.float64) / self._Ti.astype(np.float64) aL, bL = int(math.floor(np.min(ratios))) - 1, 1 aR, bR = int(math.ceil(np.max(ratios))) + 1, 1 # Verify bracket validity if self._has_negative_cycle_exact(aL, bL): raise RuntimeError("Lower bound has negative cycle") if not self._has_negative_cycle_exact(aR, bR): raise RuntimeError("Upper bound has no negative cycle") iterations = 0 for _ in range(max_steps): iterations += 1 aM, bM = aL + aR, bL + bR if bM > max_den: # Check if L is exact zero_cycle = self._find_zero_cycle_exact(aL, bL) if zero_cycle is not None: cycle, sum_c, sum_t = zero_cycle g = math.gcd(aL, bL) return cycle, sum_c, sum_t, (aL // g, bL // g), sum_c / sum_t # Move towards R with largest feasible step k = max(1, (max_den - bL) // bR) aM, bM = aL + k * aR, bL + k * bR if self._has_negative_cycle_exact(aM, bM): aR, bR = aM, bM continue # Check for exact solution zero_cycle = self._find_zero_cycle_exact(aM, bM) if zero_cycle is not None: cycle, sum_c, sum_t = zero_cycle g = math.gcd(aM, bM) if self._metrics: self._metrics.iterations = iterations return cycle, sum_c, sum_t, (aM // g, bM // g), sum_c / sum_t aL, bL = aM, bM raise RuntimeError("Stern-Brocot search did not converge") def _combine_weights_exact(self, a: int, b: int) -> np.ndarray: combo = b * self._Ci - a * self._Ti if not np.isfinite(combo).all(): raise NumericalInstabilityError( "Weight combination overflowed", component="solver", suggested_fix="Scale weights or use smaller coefficients", recovery_hint="Normalize edge weights before solving", ) combo = np.clip(combo, np.iinfo(np.int64).min, np.iinfo(np.int64).max) return combo.astype(np.int64) def _has_negative_cycle_exact(self, a: int, b: int) -> bool: """ Check for negative cycle with exact integer arithmetic. """ if self._U is None or self._V is None: raise RuntimeError( "Edge endpoint arrays (_U, _V) must be initialized before cycle checks" ) if self._Ci is None or self._Ti is None: raise RuntimeError( "Integer weight arrays (_Ci, _Ti) must be initialized before cycle checks" ) if self._starts is None or self._counts is None: raise RuntimeError( "Segment arrays (_starts, _counts) must be initialized before cycle checks" ) W = self._combine_weights_exact(a, b) dist = np.zeros(self.n, dtype=np.int64) def relax_once() -> bool: cand = dist[self._U] + W improve = cand < dist[self._V] if not np.any(improve): return False # Use numerical stable min reduction cand_imp = np.where(improve, cand, np.iinfo(np.int64).max) mins = np.minimum.reduceat(cand_imp, self._starts) mins_rep = np.repeat(mins, self._counts) good = improve & (cand_imp == mins_rep) if not np.any(good): return False # Update distances for selected vertices idx = np.flatnonzero(good) dist[self._V[idx]] = cand[idx] return True # Bellman-Ford: n-1 passes + detection pass for _ in range(self.n - 1): if not relax_once(): break return relax_once() # If still can relax, negative cycle exists def _find_zero_cycle_exact( self, a: int, b: int ) -> tuple[list[int], int, int] | None: """ Find zero-weight cycle in equality subgraph. """ # First get potentials ok, dist = self._compute_potentials_exact(a, b) if not ok: return None if self._U is None or self._V is None: raise RuntimeError( "Edge endpoint arrays (_U, _V) must be initialized before cycle extraction" ) if self._Ci is None or self._Ti is None: raise RuntimeError( "Integer weight arrays (_Ci, _Ti) must be initialized before cycle extraction" ) W = self._combine_weights_exact(a, b) equal_mask = (dist[self._U] + W) == dist[self._V] if not np.any(equal_mask): return None # Build equality subgraph adj = [[] for _ in range(self.n)] eq_indices = np.flatnonzero(equal_mask) for i in eq_indices: adj[self._U[i]].append(self._V[i]) # DFS for cycle color = np.zeros(self.n, dtype=np.int8) parent = np.full(self.n, -1, dtype=np.int64) def dfs(u: int) -> list[int] | None: color[u] = 1 # Gray for v in adj[u]: if color[v] == 0: # White parent[v] = u cycle = dfs(v) if cycle is not None: return cycle elif color[v] == 1: # Gray - back edge found # Extract cycle cycle_nodes = [v] x = u while x != v and x != -1: cycle_nodes.append(x) x = parent[x] cycle_nodes.reverse() return cycle_nodes color[u] = 2 # Black return None for start in range(self.n): if color[start] == 0: cycle = dfs(start) if cycle: return self._compute_cycle_weights_exact(cycle) return None def _compute_potentials_exact(self, a: int, b: int) -> tuple[bool, np.ndarray]: """ Compute shortest path potentials with exact arithmetic. """ if self._U is None or self._V is None: raise RuntimeError( "Edge endpoint arrays (_U, _V) must be initialized before computing potentials" ) if self._Ci is None or self._Ti is None: raise RuntimeError( "Integer weight arrays (_Ci, _Ti) must be initialized before computing potentials" ) if self._starts is None or self._counts is None: raise RuntimeError( "Segment arrays (_starts, _counts) must be initialized before computing potentials" ) W = self._combine_weights_exact(a, b) dist = np.zeros(self.n, dtype=np.int64) def relax_once() -> bool: cand = dist[self._U] + W improve = cand < dist[self._V] if not np.any(improve): return False cand_imp = np.where(improve, cand, np.iinfo(np.int64).max) mins = np.minimum.reduceat(cand_imp, self._starts) mins_rep = np.repeat(mins, self._counts) good = improve & (cand_imp == mins_rep) if not np.any(good): return False idx = np.flatnonzero(good) dist[self._V[idx]] = cand[idx] return True # Standard Bellman-Ford for _ in range(self.n - 1): if not relax_once(): break # Check for negative cycles if relax_once(): return False, dist return True, dist def _compute_cycle_weights_exact( self, cycle: list[int] ) -> tuple[list[int], int, int]: """ Compute exact cycle weights. """ sum_cost, sum_time = 0, 0 for i in range(len(cycle)): u = cycle[i] v = cycle[(i + 1) % len(cycle)] # Find edge u -> v edge_found = False for edge in self._edges: if edge.u == u and edge.v == v: sum_cost += int(edge.cost) sum_time += int(edge.time) edge_found = True break if not edge_found: raise RuntimeError(f"Edge {u} -> {v} not found in cycle extraction") return cycle, sum_cost, sum_time # ============================== # Numeric Mode Implementation # ============================== def _solve_numeric( self, target_ratio: float | None = None, **kwargs ) -> SolverResult: """ Solve using numeric binary search (Lawler's algorithm). """ lambda_lo = kwargs.get("lambda_lo") lambda_hi = kwargs.get("lambda_hi") max_iter = kwargs.get("max_iter", self.config.numeric_max_iter) tol = kwargs.get("tolerance", self.config.numeric_tolerance) slack = kwargs.get("cycle_slack", self.config.numeric_cycle_slack) try: start_time = time.time() process = psutil.Process() # Determine search bounds if lambda_lo is None or lambda_hi is None: costs = self._C times = self._T ratios = costs / times lo = float(np.min(ratios) - 1.0) hi = float(np.max(ratios) + 1.0) lambda_lo = lambda_lo or lo lambda_hi = lambda_hi or hi best_cycle = None iterations = 0 # Binary search on lambda for iteration in range(max_iter): if ( self.config.max_solve_time is not None and time.time() - start_time > self.config.max_solve_time ): raise TimeoutError( "Numeric solve exceeded time limit", time_elapsed=time.time() - start_time, time_limit=self.config.max_solve_time, operation="numeric_solve", ) if self.config.max_memory_mb is not None: mem = process.memory_info().rss / (1024 * 1024) if mem > self.config.max_memory_mb: raise ResourceExhaustionError( "Numeric solve exceeded memory limit", resource="memory", limit=self.config.max_memory_mb, usage=mem, ) iterations += 1 mid = (lambda_lo + lambda_hi) / 2.0 has_neg, cycle_data = self._detect_negative_cycle_numeric(mid, slack) if has_neg: lambda_hi = mid if cycle_data is not None: best_cycle = cycle_data # Early termination check if ( target_ratio is not None and self.config.enable_early_termination and cycle_data[2] < target_ratio ): # ratio < target self.logger.debug( "Early termination at iteration %d", iteration ) break else: lambda_lo = mid if lambda_hi - lambda_lo <= tol: break # Get final result if best_cycle is None: # Try once more at the upper bound has_neg, cycle_data = self._detect_negative_cycle_numeric( lambda_hi, slack ) if has_neg and cycle_data is not None: best_cycle = cycle_data if best_cycle is None: raise NumericalInstabilityError( "No negative cycle found", component="numeric_solve" ) cycle, sum_cost, sum_time, ratio = best_cycle # Close cycle if needed if cycle and (len(cycle) == 1 or cycle[0] != cycle[-1]): cycle.append(cycle[0]) if self._metrics: self._metrics.iterations = iterations # Convergence check if iteration == max_iter - 1 and lambda_hi - lambda_lo > tol: raise ConvergenceError( "Numeric solver failed to converge", algorithm_name="Lawler", max_iterations=max_iter, tolerance=tol, final_error=lambda_hi - lambda_lo, ) return SolverResult( cycle=cycle, sum_cost=sum_cost, sum_time=sum_time, ratio=ratio, success=True, ) except Exception as e: raise NumericalInstabilityError( "Numeric solver failed", component="numeric_solve" ) from e def _detect_negative_cycle_numeric( self, lam: float, slack: float = 0.0 ) -> tuple[bool, tuple[list[int], float, float, float] | None]: """ Detect negative cycle using vectorized Bellman-Ford. """ if self._U is None or self._V is None: raise RuntimeError( "Edge endpoint arrays (_U, _V) must be initialized before numeric cycle detection" ) if self._C is None or self._T is None: raise RuntimeError( "Weight arrays (_C, _T) must be initialized before numeric cycle detection" ) if self._starts is None or self._counts is None: raise RuntimeError( "Segment arrays (_starts, _counts) must be initialized before numeric cycle detection" ) n = self.n W = self._C - lam * self._T # Use Kahan summation if configured if self.config.use_kahan_summation: dist = np.zeros(n, dtype=np.float64) compensation = np.zeros(n, dtype=np.float64) # For Kahan summation else: dist = np.zeros(n, dtype=np.float64) compensation = None pred = np.full(n, -1, dtype=np.int64) def relax_once_kahan(update_pred: bool) -> bool: """ Relaxation step with Kahan summation for numerical stability. """ cand = dist[self._U] + W improve = cand < (dist[self._V] - slack) if not np.any(improve): return False # Segment-wise minimum selection cand_imp = np.where(improve, cand, np.inf) mins = np.full(self.n, np.inf) for v in range(self.n): start = self._starts[v] cnt = self._counts[v] if cnt > 0: mins[v] = np.min(cand_imp[start : start + cnt]) mins_rep = np.repeat(mins, self._counts) good = improve & (cand_imp == mins_rep) # First occurrence in each segment cs = np.cumsum(good.astype(np.int64)) base = np.empty_like(self._starts) base[0] = 0 if self._starts.size > 1: base[1:] = cs[self._starts[1:] - 1] first_mask = good & (cs == (np.repeat(base, self._counts) + 1)) if not np.any(first_mask): return False # Update with Kahan summation idx = np.flatnonzero(first_mask) v_sel = self._V[idx] new_dist = cand[idx] if compensation is not None: # Kahan summation update y = new_dist - dist[v_sel] - compensation[v_sel] t = dist[v_sel] + y compensation[v_sel] = (t - dist[v_sel]) - y dist[v_sel] = t else: dist[v_sel] = new_dist if update_pred: pred[v_sel] = self._U[idx] return True def relax_once_standard(update_pred: bool) -> bool: """ Standard relaxation step. """ cand = dist[self._U] + W improve = cand < (dist[self._V] - slack) if not np.any(improve): return False cand_imp = np.where(improve, cand, np.inf) mins = np.full(self.n, np.inf) for v in range(self.n): start = self._starts[v] cnt = self._counts[v] if cnt > 0: mins[v] = np.min(cand_imp[start : start + cnt]) mins_rep = np.repeat(mins, self._counts) good = improve & (cand_imp == mins_rep) cs = np.cumsum(good.astype(np.int64)) base = np.empty_like(self._starts) base[0] = 0 if self._starts.size > 1: base[1:] = cs[self._starts[1:] - 1] first_mask = good & (cs == (np.repeat(base, self._counts) + 1)) if not np.any(first_mask): return False idx = np.flatnonzero(first_mask) v_sel = self._V[idx] dist[v_sel] = cand[idx] if update_pred: pred[v_sel] = self._U[idx] return True # Choose relaxation method relax_once = ( relax_once_kahan if compensation is not None else relax_once_standard ) # Bellman-Ford iterations for _ in range(n - 1): if not relax_once(update_pred=True): break # Detection pass changed = relax_once(update_pred=True) if not changed: return False, None # Extract cycle from predecessors touched = np.where(pred != -1)[0] if touched.size == 0: return True, None # Walk to enter a cycle x = int(touched[-1]) for _ in range(n): if pred[x] == -1: break x = int(pred[x]) # Extract cycle seen = {} sequence = [] cur = x while cur not in seen and cur != -1: seen[cur] = len(sequence) sequence.append(cur) cur = int(pred[cur]) if pred[cur] != -1 else -1 if cur == -1: return True, None cycle = sequence[seen[cur] :][::-1] # Compute cycle weights sum_cost, sum_time = self._compute_cycle_weights_float(cycle) if sum_time <= 0: return True, None ratio = sum_cost / sum_time return True, (cycle, sum_cost, sum_time, ratio) def _compute_cycle_weights_float(self, cycle: list[int]) -> tuple[float, float]: """ Compute cycle weights using cached edge map. """ if not hasattr(self, "_edge_map") or self._edge_map is None: self._build_edge_map() sum_cost, sum_time = 0.0, 0.0 for i in range(len(cycle)): u = cycle[i] v = cycle[(i + 1) % len(cycle)] if (u, v) in self._edge_map: edges = self._edge_map[(u, v)] cost, time = min(edges, key=lambda ct: ct[0] / ct[1]) sum_cost += cost sum_time += time else: # Fallback: scan through edges edge_found = False for edge in self._edges: if edge.u == u and edge.v == v: sum_cost += float(edge.cost) sum_time += float(edge.time) edge_found = True break if not edge_found: raise RuntimeError(f"Edge {u} -> {v} not found in cycle") return sum_cost, sum_time def _build_edge_map(self): """ Build edge lookup map for fast cycle weight computation. """ self._edge_map = {} for edge in self._edges: key = (edge.u, edge.v) cost, time = float(edge.cost), float(edge.time) self._edge_map.setdefault(key, []).append((cost, time)) # ============================== # Approximate Mode Implementation # ============================== def _solve_approximate( self, target_ratio: float | None = None, epsilon: float = 0.01, max_time: float | None = None, **kwargs, ) -> SolverResult: """ Fast approximation algorithm for very large graphs. """ max_time = max_time or self.config.max_solve_time or 60.0 start_time = time.time() try: # Simple approximation: sample random starting points and do limited search best_result = None best_ratio = float("inf") # Try multiple random starts n_samples = min(10, self.n) sample_vertices = np.random.choice(self.n, size=n_samples, replace=False) for start_v in sample_vertices: if time.time() - start_time > max_time: break # Limited depth search from this vertex cycle, ratio = self._limited_search_from_vertex( start_v, max_depth=min(20, self.n) ) if cycle and ratio < best_ratio: best_ratio = ratio sum_cost, sum_time = self._compute_cycle_weights_float(cycle) best_result = (cycle, sum_cost, sum_time, ratio) # Early termination if good enough if target_ratio is not None and ratio <= target_ratio * ( 1 + epsilon ): break if best_result is None: return SolverResult( cycle=[], sum_cost=0, sum_time=0, ratio=float("inf"), success=False, error_message="Approximation found no cycles", ) cycle, sum_cost, sum_time, ratio = best_result # Close cycle if cycle and cycle[0] != cycle[-1]: cycle.append(cycle[0]) return SolverResult( cycle=cycle, sum_cost=sum_cost, sum_time=sum_time, ratio=ratio, success=True, ) except Exception as e: return SolverResult( cycle=[], sum_cost=0, sum_time=0, ratio=float("inf"), success=False, error_message=f"Approximation failed: {str(e)}", ) def _limited_search_from_vertex( self, start: int, max_depth: int ) -> tuple[list[int] | None, float]: """ Limited depth search for cycles from a starting vertex. """ if not hasattr(self, "_adj_list") or self._adj_list is None: self._build_adjacency_list() best_cycle = None best_ratio = float("inf") def dfs(path: list[int], visited: set, depth: int): nonlocal best_cycle, best_ratio if depth >= max_depth: return current = path[-1] for next_v, cost, time in self._adj_list[current]: if next_v == start and len(path) >= 2: # Found cycle back to start cycle = path[:] total_cost = sum( c for _, c, _ in [ (self._get_edge_weights(path[i], path[i + 1])) for i in range(len(path) - 1) ] + [(cost, time)] ) total_time = sum( t for _, t in [ (self._get_edge_weights(path[i], path[i + 1])[1]) for i in range(len(path) - 1) ] + [time] ) if total_time > 0: ratio = total_cost / total_time if ratio < best_ratio: best_ratio = ratio best_cycle = cycle elif next_v not in visited and depth < max_depth - 1: dfs(path + [next_v], visited | {next_v}, depth + 1) dfs([start], {start}, 0) return best_cycle, best_ratio def _build_adjacency_list(self): """ Build adjacency list representation. """ self._adj_list = [[] for _ in range(self.n)] for edge in self._edges: cost, time = float(edge.cost), float(edge.time) self._adj_list[edge.u].append((edge.v, cost, time)) def _get_edge_weights(self, u: int, v: int) -> tuple[float, float]: """ Get edge weights between two vertices. """ for edge in self._edges: if edge.u == u and edge.v == v: return float(edge.cost), float(edge.time) raise ValueError(f"No edge found from {u} to {v}") # ============================== # Result Validation and Repair # ============================== def _validate_result(self, result: SolverResult) -> SolverResult: """ Validate and potentially repair the result. """ if not result.success or not result.cycle: return result try: # Check cycle is properly closed if len(result.cycle) < 3: if self.config.repair_cycles: # Try to repair by extending result = self._repair_short_cycle(result) else: result.success = False result.error_message = "Cycle too short" return result # Verify cycle exists in graph valid, issues = self._verify_cycle_exists(result.cycle) if not valid: if self.config.repair_cycles: result = self._repair_missing_edges(result, issues) else: result.success = False result.error_message = f"Invalid cycle: {', '.join(issues)}" return result # Verify weights are correct computed_cost, computed_time = self._compute_cycle_weights_float( result.cycle[:-1] ) cost_error = abs(computed_cost - float(result.sum_cost)) time_error = abs(computed_time - float(result.sum_time)) ratio_error = abs(computed_cost / computed_time - float(result.ratio)) tolerance = 1e-10 if isinstance(result.ratio, (int, Fraction)) else 1e-6 if ( cost_error > tolerance or time_error > tolerance or ratio_error > tolerance ): self.logger.warning( "Weight validation failed: cost_err=%s, time_err=%s, ratio_err=%s", cost_error, time_error, ratio_error, ) if self.config.repair_cycles: # Update with correct weights result.sum_cost = computed_cost result.sum_time = computed_time result.ratio = computed_cost / computed_time else: result.success = False result.error_message = "Weight validation failed" return result return result except Exception as e: result.success = False result.error_message = f"Validation failed: {str(e)}" return result def _verify_cycle_exists(self, cycle: list[int]) -> tuple[bool, list[str]]: """ Verify all edges in cycle exist in graph. """ issues = [] for i in range(len(cycle) - 1): u, v = cycle[i], cycle[i + 1] # Check if edge exists edge_found = False for edge in self._edges: if edge.u == u and edge.v == v: edge_found = True break if not edge_found: issues.append(f"Missing edge {u} -> {v}") return len(issues) == 0, issues def _repair_short_cycle(self, result: SolverResult) -> SolverResult: """ Attempt to repair cycles that are too short. """ # Simple repair: if cycle has only 2 vertices, look for a longer path if len(result.cycle) == 3 and result.cycle[0] == result.cycle[2]: # This is just u -> v -> u, try to find u -> w -> v -> u u, v = result.cycle[0], result.cycle[1] # Find intermediate vertices for w in range(self.n): if w != u and w != v: if self._has_edge(u, w) and self._has_edge(w, v): new_cycle = [u, w, v, u] cost, time = self._compute_cycle_weights_float(new_cycle[:-1]) result.cycle = new_cycle result.sum_cost = cost result.sum_time = time result.ratio = cost / time return result return result def _repair_missing_edges( self, result: SolverResult, issues: list[str] ) -> SolverResult: """ Attempt to repair cycles with missing edges. """ # For now, just mark as failed - more sophisticated repair could be implemented result.success = False result.error_message = f"Could not repair cycle: {', '.join(issues)}" return result def _has_edge(self, u: int, v: int) -> bool: """ Check if edge u -> v exists. """ for edge in self._edges: if edge.u == u and edge.v == v: return True return False # ============================== # Utility and Debug Methods # ==============================
[docs] def get_debug_info(self) -> str: """ Get comprehensive debug information. """ info = ["Min Ratio Cycle Solver - Debug Info"] info.append("=" * 50) # Basic info info.append(f"Vertices: {self.n}") info.append(f"Edges: {len(self._edges)}") info.append(f"Integer weights: {self._all_int}") # Graph properties try: props = self.get_graph_properties() if "error" not in props: info.append(f"Density: {props['density']:.4f}") info.append(f"Is sparse: {props['is_sparse']}") conn = props["connectivity"] info.append(f"Isolated vertices: {conn['isolated_vertices']}") info.append( f"Avg degree: in={conn['avg_in_degree']:.2f}, out={conn['avg_out_degree']:.2f}" ) weights = props["weights"] info.append( f"Cost range: [{weights['cost_range'][0]:.3f}, {weights['cost_range'][1]:.3f}]" ) info.append( f"Time range: [{weights['time_range'][0]:.3f}, {weights['time_range'][1]:.3f}]" ) except Exception as e: info.append(f"Error analyzing graph: {e}") # Issues try: issues = self.detect_issues() if issues: info.append("\nPotential Issues:") for issue in issues: info.append(f" [{issue['level']}] {issue['message']}") else: info.append("\nNo issues detected") except Exception as e: info.append(f"\nError detecting issues: {e}") # Last result if self._last_result: result = self._last_result info.append("\nLast solve result:") info.append(f" Success: {result.success}") if result.success: info.append(f" Ratio: {result.ratio}") info.append(f" Cycle length: {len(result.cycle) - 1}") if result.metrics: info.append(f" Solve time: {result.metrics.solve_time:.4f}s") info.append(f" Mode: {result.metrics.mode_used}") info.append(f" Iterations: {result.metrics.iterations}") else: info.append(f" Error: {result.error_message}") return "\n".join(info)
# ============================== # Advanced Analytics Methods # ==============================
[docs] def sensitivity_analysis( self, edge_perturbations: list[dict[int, tuple[float, float]]] ) -> list[SolverResult]: """ Perform edge weight perturbation analysis. Args: edge_perturbations: List of scenarios, each mapping an edge index to a ``(delta_cost, delta_time)`` tuple. For each scenario the graph is perturbed and the solver is run again. Returns: List of :class:`SolverResult` objects, one per scenario. """ baseline = [Edge(e.u, e.v, e.cost, e.time) for e in self._edges] results: list[SolverResult] = [] for scenario in edge_perturbations: for idx, (dc, dt) in scenario.items(): if idx < 0 or idx >= len(self._edges): raise IndexError("edge index out of range") e = self._edges[idx] self._edges[idx] = Edge(e.u, e.v, e.cost + dc, e.time + dt) self._arrays_built = False try: results.append(self.solve()) finally: self._edges = [Edge(e.u, e.v, e.cost, e.time) for e in baseline] self._arrays_built = False return results
[docs] def stability_region(self, epsilon: float = 0.01) -> dict[int, bool]: """ Estimate stability of the optimal cycle under small perturbations. Each edge is perturbed by ``epsilon`` fraction of its cost and time and the solver is executed again. If the optimal cycle remains unchanged the edge is considered stable. Args: epsilon: Relative perturbation size (default 1%). Returns: Mapping from edge index to a boolean indicating stability. """ if self._last_result is None: baseline_res = self.solve() else: baseline_res = self._last_result baseline_cycle = baseline_res.cycle stability: dict[int, bool] = {} for idx, e in enumerate(self._edges): dc = float(e.cost) * epsilon dt = float(e.time) * epsilon perturbed = {idx: (dc, dt)} res_list = self.sensitivity_analysis([perturbed]) stability[idx] = res_list[0].cycle == baseline_cycle return stability
[docs] def visualize_solution( self, show_cycle: bool = True, layout: str = "spring" ) -> tuple[plt.Figure, plt.Axes]: """ Visualize the current graph and optionally highlight the solution. Args: show_cycle: If ``True`` highlight the most recent solution cycle. layout: Layout function name from ``networkx`` (e.g. ``"spring"``). Returns: Matplotlib ``Figure`` and ``Axes`` objects. """ if not self._edges: raise ValueError("No edges to visualize") G = nx.DiGraph() for idx, e in enumerate(self._edges): G.add_edge(e.u, e.v, index=idx, cost=float(e.cost), time=float(e.time)) layout_fn = getattr(nx, f"{layout}_layout") pos = layout_fn(G) fig, ax = plt.subplots() nx.draw(G, pos, ax=ax, with_labels=True, node_color="lightgray") if show_cycle and self._last_result and self._last_result.cycle: cycle_edges = list( zip(self._last_result.cycle, self._last_result.cycle[1:]) ) nx.draw_networkx_edges( G, pos, ax=ax, edgelist=cycle_edges, edge_color="red", width=2.5 ) ax.set_title("Min Ratio Cycle Graph") return fig, ax
[docs] def create_interactive_plot(self) -> plt.Figure: """ Create an interactive matplotlib plot of the current graph. """ fig, ax = self.visualize_solution() def _on_click(event: Any) -> None: if event.inaxes: ax.set_title(f"Clicked at ({event.xdata:.2f}, {event.ydata:.2f})") fig.canvas.draw_idle() fig.canvas.mpl_connect("button_press_event", _on_click) return fig
[docs] def health_check(self) -> dict[str, Any]: """ Run comprehensive health check. """ checks = [] # Check NumPy version try: import numpy as np np_version = np.__version__ min_version = "1.20.0" from packaging import version np_ok = version.parse(np_version) >= version.parse(min_version) checks.append( { "name": "numpy_version", "status": "PASS" if np_ok else "FAIL", "details": f"NumPy {np_version} (required: >={min_version})", "recommendation": "Update NumPy" if not np_ok else None, } ) except Exception as e: checks.append( { "name": "numpy_version", "status": "ERROR", "details": str(e), "recommendation": "Install NumPy", } ) # Check graph state if not self._edges: checks.append( { "name": "graph_state", "status": "WARN", "details": "No edges in graph", "recommendation": "Add edges before solving", } ) else: issues = self.detect_issues() error_count = sum(1 for issue in issues if issue["level"] == "ERROR") warn_count = sum(1 for issue in issues if issue["level"] == "WARNING") if error_count > 0: status = "FAIL" details = f"{error_count} critical issues found" recommendation = "Fix graph structure issues" elif warn_count > 0: status = "WARN" details = f"{warn_count} warnings found" recommendation = "Review warnings" else: status = "PASS" details = "Graph structure looks good" recommendation = None checks.append( { "name": "graph_state", "status": status, "details": details, "recommendation": recommendation, } ) # Memory check try: import psutil available_gb = psutil.virtual_memory().available / (1024**3) min_memory = 0.5 # 500MB minimum checks.append( { "name": "memory", "status": "PASS" if available_gb >= min_memory else "WARN", "details": f"{available_gb:.1f}GB available", "recommendation": ( "Consider freeing memory" if available_gb < min_memory else None ), } ) except ImportError: checks.append( { "name": "memory", "status": "SKIP", "details": "psutil not available", "recommendation": "Install psutil for memory monitoring", } ) # Summary total = len(checks) passed = sum(1 for c in checks if c["status"] == "PASS") warnings = sum(1 for c in checks if c["status"] == "WARN") failures = sum(1 for c in checks if c["status"] == "FAIL") return { "timestamp": time.time(), "checks": checks, "summary": { "total": total, "passed": passed, "warnings": warnings, "failures": failures, "overall_status": ( "FAIL" if failures > 0 else ("WARN" if warnings > 0 else "PASS") ), }, }
def __repr__(self) -> str: return f"MinRatioCycleSolver(n_vertices={self.n}, n_edges={len(self._edges)}, integer_mode={self._all_int})"
# ============================== # Convenience Functions # ==============================
[docs] def solve_min_ratio_cycle( edges: list[tuple[int, int, float | int, float | int]], n_vertices: int | None = None, mode: SolverMode | str = SolverMode.AUTO, config: SolverConfig | None = None, ) -> SolverResult: """ Convenience function to solve min ratio cycle problem. Args: edges: List of (u, v, cost, time) tuples n_vertices: Number of vertices (auto-detected if None) mode: Solver mode config: Solver configuration Returns: SolverResult with solution """ if n_vertices is None: # Auto-detect number of vertices vertices = set() for u, v, _, _ in edges: vertices.add(u) vertices.add(v) n_vertices = max(vertices) + 1 if vertices else 1 solver = MinRatioCycleSolver(n_vertices, config) solver.add_edges(edges) return solver.solve(mode=mode)
# ============================== # Example Usage # ============================== if __name__ == "__main__": # Example 1: Basic integer weights (exact mode) print("Example 1: Basic integer weights") solver1 = MinRatioCycleSolver(3) solver1.add_edge(0, 1, 2, 1) # cost=2, time=1 solver1.add_edge(1, 2, 3, 2) # cost=3, time=2 solver1.add_edge(2, 0, 1, 1) # cost=1, time=1 result1 = solver1.solve() print(f"Result: {result1.cycle}, ratio = {result1.ratio}") print(f"Mode used: {result1.metrics.mode_used if result1.metrics else 'unknown'}") # Example 2: Float weights (numeric mode) print("\nExample 2: Float weights") solver2 = MinRatioCycleSolver(3) solver2.add_edge(0, 1, 2.5, 1.0) solver2.add_edge(1, 2, 3.0, 2.0) solver2.add_edge(2, 0, 1.0, 1.0) result2 = solver2.solve() print(f"Result: {result2.cycle}, ratio = {result2.ratio:.6f}") print(f"Mode used: {result2.metrics.mode_used if result2.metrics else 'unknown'}") # Example 3: Using configuration print("\nExample 3: Custom configuration") config = SolverConfig( log_level=LogLevel.DEBUG, enable_preprocessing=True, validate_cycles=True ) solver3 = MinRatioCycleSolver(4, config) edges = [ (0, 1, 5, 2), (1, 2, 3, 1), (2, 3, 2, 2), (3, 0, 1, 1), (0, 2, 10, 3), # Alternative path ] solver3.add_edges(edges) result3 = solver3.solve(mode=SolverMode.NUMERIC) print(f"Result: {result3.cycle}, ratio = {result3.ratio:.6f}") if result3.success and result3.metrics: print(f"Solve time: {result3.metrics.solve_time:.4f}s") print(f"Iterations: {result3.metrics.iterations}") print(f"Preprocessing time: {result3.metrics.preprocessing_time:.6f}s") # Example 4: Health check and debugging print("\nExample 4: Health check") health = solver3.health_check() print(f"Overall health: {health['summary']['overall_status']}") print(f"Checks: {health['summary']['passed']}/{health['summary']['total']} passed") # Example 5: Convenience function print("\nExample 5: Convenience function") edges_list = [ (0, 1, -2, 1), # Negative cost cycle (1, 2, -1, 1), (2, 0, 1, 1), ] result5 = solve_min_ratio_cycle(edges_list) print(f"Convenience result: ratio = {result5.ratio:.6f}") # Example 6: Approximate mode for large graphs print("\nExample 6: Approximate mode") solver6 = MinRatioCycleSolver(10) # Add many random edges import random # nosec B311 random.seed(42) for _ in range(30): u, v = random.randint(0, 9), random.randint(0, 9) # nosec B311 cost = random.randint(-5, 10) # nosec B311 time = random.randint(1, 5) # nosec B311 solver6.add_edge(u, v, cost, time) result6 = solver6.solve(mode=SolverMode.APPROXIMATE) print(f"Approximate result: ratio = {result6.ratio:.6f}") print(f"Success: {result6.success}") # Example 7: Debug information print("\nExample 7: Debug info") debug_info = solver1.get_debug_info() print(debug_info)