--- name: lp-solver description: Integrates PuLP/Xpress MIP solvers into league-planner with proper configuration, Celery task wrapping, progress callbacks, and result handling. Use for optimization tasks. argument-hint: allowed-tools: Read, Write, Edit, Glob, Grep --- # League-Planner Solver Integration Integrates Mixed-Integer Programming (MIP) solvers (PuLP with CBC, or FICO Xpress) into the league-planner system following project patterns: solver configuration, Celery task wrapping, progress reporting, and graceful degradation. ## When to Use - Creating new optimization models (scheduling, draws, assignments) - Integrating solver output with Django models - Implementing progress reporting for long-running optimizations - Configuring solver parameters for performance tuning ## Prerequisites - PuLP installed: `pip install pulp>=2.7` - Xpress installed (optional): requires license and `xpress` package - Solver submodules cloned: `scheduler/solver/`, `draws/solver/` - Environment variable `SOLVER` set to `pulp` or `xpress` ## Instructions ### Step 1: Environment Configuration ```python # leagues/settings.py import os # Solver selection: 'pulp' (default, free) or 'xpress' (commercial, faster) SOLVER = os.environ.get('SOLVER', 'pulp') # Run mode: 'local' (synchronous) or 'celery' (async) RUN_MODE = os.environ.get('RUN_MODE', 'local') # Solver-specific settings SOLVER_SETTINGS = { 'pulp': { 'solver': 'CBC', # or 'GLPK', 'COIN_CMD' 'msg': False, 'timeLimit': 3600, # 1 hour 'gapRel': 0.01, # 1% optimality gap }, 'xpress': { 'maxtime': 3600, 'miprelstop': 0.01, 'threads': 4, 'presolve': 1, }, } ``` ### Step 2: Create Solver Module ```python # scheduler/solver/optimizer.py from __future__ import annotations import logging from typing import Callable, Any from dataclasses import dataclass from django.conf import settings logger = logging.getLogger('custom') @dataclass class OptimizationResult: """Container for optimization results.""" status: str # 'optimal', 'feasible', 'infeasible', 'timeout', 'error' objective_value: float | None solution: dict[str, Any] solve_time: float gap: float | None iterations: int def is_success(self) -> bool: return self.status in ('optimal', 'feasible') def summary(self) -> dict: return { 'status': self.status, 'objective': self.objective_value, 'solve_time': self.solve_time, 'gap': self.gap, } class BaseOptimizer: """Base class for optimization models.""" def __init__( self, name: str, progress_callback: Callable[[int, str], None] | None = None, abort_check: Callable[[], bool] | None = None, ): self.name = name self.progress_callback = progress_callback or (lambda p, s: None) self.abort_check = abort_check or (lambda: False) self.model = None self.variables = {} self.constraints = [] def report_progress(self, percent: int, message: str): """Report progress to callback.""" self.progress_callback(percent, message) logger.info(f"[{self.name}] {percent}% - {message}") def check_abort(self) -> bool: """Check if optimization should be aborted.""" return self.abort_check() def build_model(self, data: dict) -> None: """Build the optimization model. Override in subclass.""" raise NotImplementedError def solve(self) -> OptimizationResult: """Solve the model. Override in subclass.""" raise NotImplementedError def extract_solution(self) -> dict: """Extract solution from solved model. Override in subclass.""" raise NotImplementedError class PuLPOptimizer(BaseOptimizer): """Optimizer using PuLP with CBC solver.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) import pulp self.pulp = pulp def build_model(self, data: dict) -> None: """Build PuLP model from data.""" self.report_progress(10, "Building optimization model...") # Create model self.model = self.pulp.LpProblem(self.name, self.pulp.LpMinimize) # Example: Binary variables for match assignments # x[i,j,k] = 1 if match i is assigned to day j, slot k matches = data.get('matches', []) days = data.get('days', []) slots = data.get('slots', []) self.variables['x'] = self.pulp.LpVariable.dicts( 'x', ((m, d, s) for m in matches for d in days for s in slots), cat='Binary' ) self.report_progress(30, "Adding constraints...") # Each match assigned exactly once for m in matches: self.model += ( self.pulp.lpSum( self.variables['x'][m, d, s] for d in days for s in slots ) == 1, f"assign_match_{m}" ) self.report_progress(50, "Setting objective function...") # Minimize total travel distance (example) self.model += self.pulp.lpSum( self.variables['x'][m, d, s] * data['costs'].get((m, d, s), 0) for m in matches for d in days for s in slots ) def solve(self) -> OptimizationResult: """Solve the PuLP model.""" import time self.report_progress(60, "Solving optimization model...") # Configure solver solver_settings = settings.SOLVER_SETTINGS.get('pulp', {}) if solver_settings.get('solver') == 'CBC': solver = self.pulp.PULP_CBC_CMD( msg=solver_settings.get('msg', False), timeLimit=solver_settings.get('timeLimit', 3600), gapRel=solver_settings.get('gapRel', 0.01), ) else: solver = None # Use default start_time = time.time() try: status = self.model.solve(solver) solve_time = time.time() - start_time status_map = { self.pulp.LpStatusOptimal: 'optimal', self.pulp.LpStatusNotSolved: 'not_solved', self.pulp.LpStatusInfeasible: 'infeasible', self.pulp.LpStatusUnbounded: 'unbounded', self.pulp.LpStatusUndefined: 'undefined', } return OptimizationResult( status=status_map.get(status, 'unknown'), objective_value=self.pulp.value(self.model.objective), solution=self.extract_solution() if status == self.pulp.LpStatusOptimal else {}, solve_time=solve_time, gap=None, # CBC doesn't easily expose gap iterations=0, ) except Exception as e: logger.error(f"Solver error: {e}") return OptimizationResult( status='error', objective_value=None, solution={}, solve_time=time.time() - start_time, gap=None, iterations=0, ) def extract_solution(self) -> dict: """Extract solution values from solved model.""" solution = {} for var_name, var_dict in self.variables.items(): solution[var_name] = { key: var.varValue for key, var in var_dict.items() if var.varValue is not None and var.varValue > 0.5 } return solution class XpressOptimizer(BaseOptimizer): """Optimizer using FICO Xpress solver.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) try: import xpress as xp self.xp = xp except ImportError: raise ImportError("Xpress solver not available. Install with: pip install xpress") def build_model(self, data: dict) -> None: """Build Xpress model from data.""" self.report_progress(10, "Building Xpress model...") self.model = self.xp.problem(name=self.name) # Example variables matches = data.get('matches', []) days = data.get('days', []) slots = data.get('slots', []) # Create binary variables self.variables['x'] = { (m, d, s): self.xp.var(vartype=self.xp.binary, name=f'x_{m}_{d}_{s}') for m in matches for d in days for s in slots } self.model.addVariable(*self.variables['x'].values()) self.report_progress(30, "Adding constraints...") # Each match assigned exactly once for m in matches: self.model.addConstraint( self.xp.Sum(self.variables['x'][m, d, s] for d in days for s in slots) == 1 ) self.report_progress(50, "Setting objective...") # Objective self.model.setObjective( self.xp.Sum( self.variables['x'][m, d, s] * data['costs'].get((m, d, s), 0) for m in matches for d in days for s in slots ), sense=self.xp.minimize ) def solve(self) -> OptimizationResult: """Solve the Xpress model.""" import time self.report_progress(60, "Solving with Xpress...") solver_settings = settings.SOLVER_SETTINGS.get('xpress', {}) # Set controls self.model.controls.maxtime = solver_settings.get('maxtime', 3600) self.model.controls.miprelstop = solver_settings.get('miprelstop', 0.01) self.model.controls.threads = solver_settings.get('threads', 4) start_time = time.time() try: self.model.solve() solve_time = time.time() - start_time # Get solution status status_code = self.model.getProbStatus() status_map = { self.xp.mip_optimal: 'optimal', self.xp.mip_solution: 'feasible', self.xp.mip_infeas: 'infeasible', self.xp.mip_unbounded: 'unbounded', } return OptimizationResult( status=status_map.get(status_code, 'unknown'), objective_value=self.model.getObjVal() if status_code in (self.xp.mip_optimal, self.xp.mip_solution) else None, solution=self.extract_solution(), solve_time=solve_time, gap=self.model.getAttrib('miprelgap') if hasattr(self.model, 'getAttrib') else None, iterations=self.model.getAttrib('simplexiter') if hasattr(self.model, 'getAttrib') else 0, ) except Exception as e: logger.error(f"Xpress error: {e}") return OptimizationResult( status='error', objective_value=None, solution={}, solve_time=time.time() - start_time, gap=None, iterations=0, ) def extract_solution(self) -> dict: """Extract solution from Xpress model.""" solution = {} for var_name, var_dict in self.variables.items(): solution[var_name] = { key: self.model.getSolution(var) for key, var in var_dict.items() if self.model.getSolution(var) > 0.5 } return solution def get_optimizer(name: str, **kwargs) -> BaseOptimizer: """Factory function to get appropriate optimizer based on settings.""" solver = settings.SOLVER if solver == 'xpress': try: return XpressOptimizer(name, **kwargs) except ImportError: logger.warning("Xpress not available, falling back to PuLP") return PuLPOptimizer(name, **kwargs) else: return PuLPOptimizer(name, **kwargs) ``` ### Step 3: Wrap in Celery Task ```python # scheduler/solver/tasks.py from celery import shared_task from celery.contrib.abortable import AbortableTask from django.db import transaction from taskmanager.models import Task as TaskRecord @shared_task( bind=True, name='scheduler.optimize_scenario', base=AbortableTask, time_limit=7200, # 2 hours soft_time_limit=7000, ) def task_optimize_scenario( self, scenario_id: int, user_id: int = None, options: dict = None, ) -> dict: """ Run optimization for a scenario. Args: scenario_id: ID of scenario to optimize user_id: Optional user for notifications options: Solver options override Returns: dict with optimization results """ from scheduler.models import Scenario, OptimizationRun from scheduler.solver.optimizer import get_optimizer, OptimizationResult options = options or {} # Create task tracking record task_record = TaskRecord.objects.create( task_id=self.request.id, task_name='scheduler.optimize_scenario', scenario_id=scenario_id, user_id=user_id, queue=self.request.delivery_info.get('routing_key', 'celery'), ) def progress_callback(percent: int, message: str): """Update progress in both Celery and TaskRecord.""" self.update_state( state='PROGRESS', meta={'progress': percent, 'status': message} ) task_record.update_progress(percent, message) def abort_check() -> bool: """Check if task should abort.""" return self.is_aborted() try: # Load scenario with related data scenario = Scenario.objects.select_related( 'season', 'season__league' ).prefetch_related( 'matches__home_team', 'matches__away_team', 'days', 'kick_off_times', ).get(pk=scenario_id) progress_callback(5, 'Preparing optimization data...') # Prepare data for solver data = prepare_optimization_data(scenario, options) if abort_check(): return {'status': 'aborted', 'scenario_id': scenario_id} # Create optimizer optimizer = get_optimizer( name=f'scenario_{scenario_id}', progress_callback=progress_callback, abort_check=abort_check, ) # Build and solve optimizer.build_model(data) if abort_check(): return {'status': 'aborted', 'scenario_id': scenario_id} result = optimizer.solve() progress_callback(80, 'Processing results...') if abort_check(): return {'status': 'aborted', 'scenario_id': scenario_id} # Save results if successful if result.is_success(): with transaction.atomic(): apply_solution_to_scenario(scenario, result.solution) # Create optimization run record OptimizationRun.objects.create( scenario=scenario, status=result.status, objective_value=result.objective_value, solve_time=result.solve_time, gap=result.gap, settings=options, ) progress_callback(100, 'Complete') task_record.mark_completed() return { 'status': result.status, 'scenario_id': scenario_id, 'objective': result.objective_value, 'solve_time': result.solve_time, 'gap': result.gap, } except Exception as e: import traceback task_record.update_progress(-1, f'Error: {str(e)}') return { 'status': 'error', 'scenario_id': scenario_id, 'error': str(e), 'traceback': traceback.format_exc(), } def prepare_optimization_data(scenario, options: dict) -> dict: """Prepare data dictionary for solver.""" matches = list(scenario.matches.select_related('home_team', 'away_team')) days = list(scenario.days.all()) slots = list(scenario.kick_off_times.all()) # Calculate costs (distances, preferences, etc.) costs = {} for match in matches: for day in days: for slot in slots: costs[(match.id, day.id, slot.id)] = calculate_cost( match, day, slot, options ) return { 'matches': [m.id for m in matches], 'days': [d.id for d in days], 'slots': [s.id for s in slots], 'costs': costs, 'match_data': {m.id: m for m in matches}, 'options': options, } def calculate_cost(match, day, slot, options) -> float: """Calculate assignment cost for a match-day-slot combination.""" cost = 0.0 # Distance component if options.get('weight_distance', 1.0) > 0: from common.functions import dist distance = dist(match.home_team, match.away_team) cost += options.get('weight_distance', 1.0) * distance # Preference component if hasattr(match, 'preferred_day') and match.preferred_day: if day.id != match.preferred_day_id: cost += options.get('preference_penalty', 100.0) return cost def apply_solution_to_scenario(scenario, solution: dict): """Apply optimization solution to scenario matches.""" from scheduler.models import Match x_values = solution.get('x', {}) # Batch update matches updates = [] for (match_id, day_id, slot_id), value in x_values.items(): if value > 0.5: updates.append((match_id, day_id, slot_id)) for match_id, day_id, slot_id in updates: Match.objects.filter(pk=match_id).update( day_id=day_id, kick_off_time_id=slot_id, ) ``` ### Step 4: Trigger from Views ```python # scheduler/views_func.py from django.http import JsonResponse from django.conf import settings from common.decorators import crud_decorator @crud_decorator(require_edit=True) def start_optimization(request, scenario_id: int): """Start optimization for a scenario.""" from scheduler.models import Scenario from scheduler.solver.tasks import task_optimize_scenario scenario = Scenario.objects.get(pk=scenario_id) # Check if optimization is already running from taskmanager.models import Task as TaskRecord running = TaskRecord.objects.filter( scenario_id=scenario_id, task_name='scheduler.optimize_scenario', completed_at__isnull=True, ).exists() if running: return JsonResponse({ 'status': 'error', 'message': 'Optimization already running for this scenario', }, status=400) # Get options from request options = { 'weight_distance': float(request.POST.get('weight_distance', 1.0)), 'weight_fairness': float(request.POST.get('weight_fairness', 1.0)), 'time_limit': int(request.POST.get('time_limit', 3600)), } # Start task based on run mode if settings.RUN_MODE == 'celery': result = task_optimize_scenario.delay( scenario_id=scenario.pk, user_id=request.user.pk, options=options, ) return JsonResponse({ 'status': 'started', 'task_id': result.id, 'message': 'Optimization started in background', }) else: # Synchronous execution result = task_optimize_scenario( scenario_id=scenario.pk, user_id=request.user.pk, options=options, ) return JsonResponse({ 'status': result.get('status'), 'result': result, }) ``` ## Patterns & Best Practices ### Graceful Degradation ```python # scheduler/solver/__init__.py def get_task_optimize(): """Get optimization task with graceful fallback.""" try: from scheduler.solver.tasks import task_optimize_scenario return task_optimize_scenario except ImportError as e: import logging logging.warning(f"Solver module not available: {e}") # Return dummy task def dummy_task(*args, **kwargs): return {'status': 'error', 'message': 'Solver not configured'} return dummy_task task_optimize = get_task_optimize() ``` ### Progress Callback Pattern ```python def create_progress_reporter(task, task_record): """Create a progress reporter function for the optimizer.""" def report(percent: int, message: str): # Update Celery state task.update_state( state='PROGRESS', meta={ 'progress': percent, 'status': message, 'timestamp': timezone.now().isoformat(), } ) # Update database record task_record.update_progress(percent, message) # Log for monitoring import logging logging.info(f"[{task.request.id}] {percent}% - {message}") return report ``` ### Solver Parameter Tuning ```python # Adjust parameters based on problem size def get_solver_params(data: dict) -> dict: """Get solver parameters based on problem size.""" n_matches = len(data['matches']) n_days = len(data['days']) n_slots = len(data['slots']) n_vars = n_matches * n_days * n_slots if n_vars < 10000: # Small problem return { 'timeLimit': 300, 'gapRel': 0.001, 'threads': 2, } elif n_vars < 100000: # Medium problem return { 'timeLimit': 1800, 'gapRel': 0.01, 'threads': 4, } else: # Large problem return { 'timeLimit': 3600, 'gapRel': 0.05, 'threads': 8, 'presolve': 1, 'heuristics': 1, } ``` ## Common Pitfalls - **Memory issues**: Large models can consume significant memory; use sparse data structures - **Timeout handling**: Always set time limits and handle timeout results gracefully - **Integer infeasibility**: Check for infeasible constraints before large solve attempts - **Missing abort checks**: Long solves must periodically check for abort signals - **Transaction boundaries**: Wrap solution application in atomic transactions ## Verification Test solver integration: ```python # In Django shell from scheduler.solver.optimizer import get_optimizer, PuLPOptimizer # Test PuLP opt = PuLPOptimizer('test') data = { 'matches': [1, 2, 3], 'days': [1, 2], 'slots': [1], 'costs': {(m, d, s): m * d for m in [1,2,3] for d in [1,2] for s in [1]}, } opt.build_model(data) result = opt.solve() print(result.status, result.objective_value) ```