22 KiB
22 KiB
| name | description | argument-hint | allowed-tools |
|---|---|---|---|
| lp-solver | Integrates PuLP/Xpress MIP solvers into league-planner with proper configuration, Celery task wrapping, progress callbacks, and result handling. Use for optimization tasks. | <optimization-type> | Read, Write, Edit, Glob, Grep |
League-Planner Solver Integration
Integrates Mixed-Integer Programming (MIP) solvers (PuLP with CBC, or FICO Xpress) into the league-planner system following project patterns: solver configuration, Celery task wrapping, progress reporting, and graceful degradation.
When to Use
- Creating new optimization models (scheduling, draws, assignments)
- Integrating solver output with Django models
- Implementing progress reporting for long-running optimizations
- Configuring solver parameters for performance tuning
Prerequisites
- PuLP installed:
pip install pulp>=2.7 - Xpress installed (optional): requires license and
xpresspackage - Solver submodules cloned:
scheduler/solver/,draws/solver/ - Environment variable
SOLVERset topulporxpress
Instructions
Step 1: Environment Configuration
# leagues/settings.py
import os
# Solver selection: 'pulp' (default, free) or 'xpress' (commercial, faster)
SOLVER = os.environ.get('SOLVER', 'pulp')
# Run mode: 'local' (synchronous) or 'celery' (async)
RUN_MODE = os.environ.get('RUN_MODE', 'local')
# Solver-specific settings
SOLVER_SETTINGS = {
'pulp': {
'solver': 'CBC', # or 'GLPK', 'COIN_CMD'
'msg': False,
'timeLimit': 3600, # 1 hour
'gapRel': 0.01, # 1% optimality gap
},
'xpress': {
'maxtime': 3600,
'miprelstop': 0.01,
'threads': 4,
'presolve': 1,
},
}
Step 2: Create Solver Module
# scheduler/solver/optimizer.py
from __future__ import annotations
import logging
from typing import Callable, Any
from dataclasses import dataclass
from django.conf import settings
logger = logging.getLogger('custom')
@dataclass
class OptimizationResult:
"""Container for optimization results."""
status: str # 'optimal', 'feasible', 'infeasible', 'timeout', 'error'
objective_value: float | None
solution: dict[str, Any]
solve_time: float
gap: float | None
iterations: int
def is_success(self) -> bool:
return self.status in ('optimal', 'feasible')
def summary(self) -> dict:
return {
'status': self.status,
'objective': self.objective_value,
'solve_time': self.solve_time,
'gap': self.gap,
}
class BaseOptimizer:
"""Base class for optimization models."""
def __init__(
self,
name: str,
progress_callback: Callable[[int, str], None] | None = None,
abort_check: Callable[[], bool] | None = None,
):
self.name = name
self.progress_callback = progress_callback or (lambda p, s: None)
self.abort_check = abort_check or (lambda: False)
self.model = None
self.variables = {}
self.constraints = []
def report_progress(self, percent: int, message: str):
"""Report progress to callback."""
self.progress_callback(percent, message)
logger.info(f"[{self.name}] {percent}% - {message}")
def check_abort(self) -> bool:
"""Check if optimization should be aborted."""
return self.abort_check()
def build_model(self, data: dict) -> None:
"""Build the optimization model. Override in subclass."""
raise NotImplementedError
def solve(self) -> OptimizationResult:
"""Solve the model. Override in subclass."""
raise NotImplementedError
def extract_solution(self) -> dict:
"""Extract solution from solved model. Override in subclass."""
raise NotImplementedError
class PuLPOptimizer(BaseOptimizer):
"""Optimizer using PuLP with CBC solver."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
import pulp
self.pulp = pulp
def build_model(self, data: dict) -> None:
"""Build PuLP model from data."""
self.report_progress(10, "Building optimization model...")
# Create model
self.model = self.pulp.LpProblem(self.name, self.pulp.LpMinimize)
# Example: Binary variables for match assignments
# x[i,j,k] = 1 if match i is assigned to day j, slot k
matches = data.get('matches', [])
days = data.get('days', [])
slots = data.get('slots', [])
self.variables['x'] = self.pulp.LpVariable.dicts(
'x',
((m, d, s) for m in matches for d in days for s in slots),
cat='Binary'
)
self.report_progress(30, "Adding constraints...")
# Each match assigned exactly once
for m in matches:
self.model += (
self.pulp.lpSum(
self.variables['x'][m, d, s]
for d in days for s in slots
) == 1,
f"assign_match_{m}"
)
self.report_progress(50, "Setting objective function...")
# Minimize total travel distance (example)
self.model += self.pulp.lpSum(
self.variables['x'][m, d, s] * data['costs'].get((m, d, s), 0)
for m in matches for d in days for s in slots
)
def solve(self) -> OptimizationResult:
"""Solve the PuLP model."""
import time
self.report_progress(60, "Solving optimization model...")
# Configure solver
solver_settings = settings.SOLVER_SETTINGS.get('pulp', {})
if solver_settings.get('solver') == 'CBC':
solver = self.pulp.PULP_CBC_CMD(
msg=solver_settings.get('msg', False),
timeLimit=solver_settings.get('timeLimit', 3600),
gapRel=solver_settings.get('gapRel', 0.01),
)
else:
solver = None # Use default
start_time = time.time()
try:
status = self.model.solve(solver)
solve_time = time.time() - start_time
status_map = {
self.pulp.LpStatusOptimal: 'optimal',
self.pulp.LpStatusNotSolved: 'not_solved',
self.pulp.LpStatusInfeasible: 'infeasible',
self.pulp.LpStatusUnbounded: 'unbounded',
self.pulp.LpStatusUndefined: 'undefined',
}
return OptimizationResult(
status=status_map.get(status, 'unknown'),
objective_value=self.pulp.value(self.model.objective),
solution=self.extract_solution() if status == self.pulp.LpStatusOptimal else {},
solve_time=solve_time,
gap=None, # CBC doesn't easily expose gap
iterations=0,
)
except Exception as e:
logger.error(f"Solver error: {e}")
return OptimizationResult(
status='error',
objective_value=None,
solution={},
solve_time=time.time() - start_time,
gap=None,
iterations=0,
)
def extract_solution(self) -> dict:
"""Extract solution values from solved model."""
solution = {}
for var_name, var_dict in self.variables.items():
solution[var_name] = {
key: var.varValue
for key, var in var_dict.items()
if var.varValue is not None and var.varValue > 0.5
}
return solution
class XpressOptimizer(BaseOptimizer):
"""Optimizer using FICO Xpress solver."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
try:
import xpress as xp
self.xp = xp
except ImportError:
raise ImportError("Xpress solver not available. Install with: pip install xpress")
def build_model(self, data: dict) -> None:
"""Build Xpress model from data."""
self.report_progress(10, "Building Xpress model...")
self.model = self.xp.problem(name=self.name)
# Example variables
matches = data.get('matches', [])
days = data.get('days', [])
slots = data.get('slots', [])
# Create binary variables
self.variables['x'] = {
(m, d, s): self.xp.var(vartype=self.xp.binary, name=f'x_{m}_{d}_{s}')
for m in matches for d in days for s in slots
}
self.model.addVariable(*self.variables['x'].values())
self.report_progress(30, "Adding constraints...")
# Each match assigned exactly once
for m in matches:
self.model.addConstraint(
self.xp.Sum(self.variables['x'][m, d, s] for d in days for s in slots) == 1
)
self.report_progress(50, "Setting objective...")
# Objective
self.model.setObjective(
self.xp.Sum(
self.variables['x'][m, d, s] * data['costs'].get((m, d, s), 0)
for m in matches for d in days for s in slots
),
sense=self.xp.minimize
)
def solve(self) -> OptimizationResult:
"""Solve the Xpress model."""
import time
self.report_progress(60, "Solving with Xpress...")
solver_settings = settings.SOLVER_SETTINGS.get('xpress', {})
# Set controls
self.model.controls.maxtime = solver_settings.get('maxtime', 3600)
self.model.controls.miprelstop = solver_settings.get('miprelstop', 0.01)
self.model.controls.threads = solver_settings.get('threads', 4)
start_time = time.time()
try:
self.model.solve()
solve_time = time.time() - start_time
# Get solution status
status_code = self.model.getProbStatus()
status_map = {
self.xp.mip_optimal: 'optimal',
self.xp.mip_solution: 'feasible',
self.xp.mip_infeas: 'infeasible',
self.xp.mip_unbounded: 'unbounded',
}
return OptimizationResult(
status=status_map.get(status_code, 'unknown'),
objective_value=self.model.getObjVal() if status_code in (self.xp.mip_optimal, self.xp.mip_solution) else None,
solution=self.extract_solution(),
solve_time=solve_time,
gap=self.model.getAttrib('miprelgap') if hasattr(self.model, 'getAttrib') else None,
iterations=self.model.getAttrib('simplexiter') if hasattr(self.model, 'getAttrib') else 0,
)
except Exception as e:
logger.error(f"Xpress error: {e}")
return OptimizationResult(
status='error',
objective_value=None,
solution={},
solve_time=time.time() - start_time,
gap=None,
iterations=0,
)
def extract_solution(self) -> dict:
"""Extract solution from Xpress model."""
solution = {}
for var_name, var_dict in self.variables.items():
solution[var_name] = {
key: self.model.getSolution(var)
for key, var in var_dict.items()
if self.model.getSolution(var) > 0.5
}
return solution
def get_optimizer(name: str, **kwargs) -> BaseOptimizer:
"""Factory function to get appropriate optimizer based on settings."""
solver = settings.SOLVER
if solver == 'xpress':
try:
return XpressOptimizer(name, **kwargs)
except ImportError:
logger.warning("Xpress not available, falling back to PuLP")
return PuLPOptimizer(name, **kwargs)
else:
return PuLPOptimizer(name, **kwargs)
Step 3: Wrap in Celery Task
# scheduler/solver/tasks.py
from celery import shared_task
from celery.contrib.abortable import AbortableTask
from django.db import transaction
from taskmanager.models import Task as TaskRecord
@shared_task(
bind=True,
name='scheduler.optimize_scenario',
base=AbortableTask,
time_limit=7200, # 2 hours
soft_time_limit=7000,
)
def task_optimize_scenario(
self,
scenario_id: int,
user_id: int = None,
options: dict = None,
) -> dict:
"""
Run optimization for a scenario.
Args:
scenario_id: ID of scenario to optimize
user_id: Optional user for notifications
options: Solver options override
Returns:
dict with optimization results
"""
from scheduler.models import Scenario, OptimizationRun
from scheduler.solver.optimizer import get_optimizer, OptimizationResult
options = options or {}
# Create task tracking record
task_record = TaskRecord.objects.create(
task_id=self.request.id,
task_name='scheduler.optimize_scenario',
scenario_id=scenario_id,
user_id=user_id,
queue=self.request.delivery_info.get('routing_key', 'celery'),
)
def progress_callback(percent: int, message: str):
"""Update progress in both Celery and TaskRecord."""
self.update_state(
state='PROGRESS',
meta={'progress': percent, 'status': message}
)
task_record.update_progress(percent, message)
def abort_check() -> bool:
"""Check if task should abort."""
return self.is_aborted()
try:
# Load scenario with related data
scenario = Scenario.objects.select_related(
'season', 'season__league'
).prefetch_related(
'matches__home_team',
'matches__away_team',
'days',
'kick_off_times',
).get(pk=scenario_id)
progress_callback(5, 'Preparing optimization data...')
# Prepare data for solver
data = prepare_optimization_data(scenario, options)
if abort_check():
return {'status': 'aborted', 'scenario_id': scenario_id}
# Create optimizer
optimizer = get_optimizer(
name=f'scenario_{scenario_id}',
progress_callback=progress_callback,
abort_check=abort_check,
)
# Build and solve
optimizer.build_model(data)
if abort_check():
return {'status': 'aborted', 'scenario_id': scenario_id}
result = optimizer.solve()
progress_callback(80, 'Processing results...')
if abort_check():
return {'status': 'aborted', 'scenario_id': scenario_id}
# Save results if successful
if result.is_success():
with transaction.atomic():
apply_solution_to_scenario(scenario, result.solution)
# Create optimization run record
OptimizationRun.objects.create(
scenario=scenario,
status=result.status,
objective_value=result.objective_value,
solve_time=result.solve_time,
gap=result.gap,
settings=options,
)
progress_callback(100, 'Complete')
task_record.mark_completed()
return {
'status': result.status,
'scenario_id': scenario_id,
'objective': result.objective_value,
'solve_time': result.solve_time,
'gap': result.gap,
}
except Exception as e:
import traceback
task_record.update_progress(-1, f'Error: {str(e)}')
return {
'status': 'error',
'scenario_id': scenario_id,
'error': str(e),
'traceback': traceback.format_exc(),
}
def prepare_optimization_data(scenario, options: dict) -> dict:
"""Prepare data dictionary for solver."""
matches = list(scenario.matches.select_related('home_team', 'away_team'))
days = list(scenario.days.all())
slots = list(scenario.kick_off_times.all())
# Calculate costs (distances, preferences, etc.)
costs = {}
for match in matches:
for day in days:
for slot in slots:
costs[(match.id, day.id, slot.id)] = calculate_cost(
match, day, slot, options
)
return {
'matches': [m.id for m in matches],
'days': [d.id for d in days],
'slots': [s.id for s in slots],
'costs': costs,
'match_data': {m.id: m for m in matches},
'options': options,
}
def calculate_cost(match, day, slot, options) -> float:
"""Calculate assignment cost for a match-day-slot combination."""
cost = 0.0
# Distance component
if options.get('weight_distance', 1.0) > 0:
from common.functions import dist
distance = dist(match.home_team, match.away_team)
cost += options.get('weight_distance', 1.0) * distance
# Preference component
if hasattr(match, 'preferred_day') and match.preferred_day:
if day.id != match.preferred_day_id:
cost += options.get('preference_penalty', 100.0)
return cost
def apply_solution_to_scenario(scenario, solution: dict):
"""Apply optimization solution to scenario matches."""
from scheduler.models import Match
x_values = solution.get('x', {})
# Batch update matches
updates = []
for (match_id, day_id, slot_id), value in x_values.items():
if value > 0.5:
updates.append((match_id, day_id, slot_id))
for match_id, day_id, slot_id in updates:
Match.objects.filter(pk=match_id).update(
day_id=day_id,
kick_off_time_id=slot_id,
)
Step 4: Trigger from Views
# scheduler/views_func.py
from django.http import JsonResponse
from django.conf import settings
from common.decorators import crud_decorator
@crud_decorator(require_edit=True)
def start_optimization(request, scenario_id: int):
"""Start optimization for a scenario."""
from scheduler.models import Scenario
from scheduler.solver.tasks import task_optimize_scenario
scenario = Scenario.objects.get(pk=scenario_id)
# Check if optimization is already running
from taskmanager.models import Task as TaskRecord
running = TaskRecord.objects.filter(
scenario_id=scenario_id,
task_name='scheduler.optimize_scenario',
completed_at__isnull=True,
).exists()
if running:
return JsonResponse({
'status': 'error',
'message': 'Optimization already running for this scenario',
}, status=400)
# Get options from request
options = {
'weight_distance': float(request.POST.get('weight_distance', 1.0)),
'weight_fairness': float(request.POST.get('weight_fairness', 1.0)),
'time_limit': int(request.POST.get('time_limit', 3600)),
}
# Start task based on run mode
if settings.RUN_MODE == 'celery':
result = task_optimize_scenario.delay(
scenario_id=scenario.pk,
user_id=request.user.pk,
options=options,
)
return JsonResponse({
'status': 'started',
'task_id': result.id,
'message': 'Optimization started in background',
})
else:
# Synchronous execution
result = task_optimize_scenario(
scenario_id=scenario.pk,
user_id=request.user.pk,
options=options,
)
return JsonResponse({
'status': result.get('status'),
'result': result,
})
Patterns & Best Practices
Graceful Degradation
# scheduler/solver/__init__.py
def get_task_optimize():
"""Get optimization task with graceful fallback."""
try:
from scheduler.solver.tasks import task_optimize_scenario
return task_optimize_scenario
except ImportError as e:
import logging
logging.warning(f"Solver module not available: {e}")
# Return dummy task
def dummy_task(*args, **kwargs):
return {'status': 'error', 'message': 'Solver not configured'}
return dummy_task
task_optimize = get_task_optimize()
Progress Callback Pattern
def create_progress_reporter(task, task_record):
"""Create a progress reporter function for the optimizer."""
def report(percent: int, message: str):
# Update Celery state
task.update_state(
state='PROGRESS',
meta={
'progress': percent,
'status': message,
'timestamp': timezone.now().isoformat(),
}
)
# Update database record
task_record.update_progress(percent, message)
# Log for monitoring
import logging
logging.info(f"[{task.request.id}] {percent}% - {message}")
return report
Solver Parameter Tuning
# Adjust parameters based on problem size
def get_solver_params(data: dict) -> dict:
"""Get solver parameters based on problem size."""
n_matches = len(data['matches'])
n_days = len(data['days'])
n_slots = len(data['slots'])
n_vars = n_matches * n_days * n_slots
if n_vars < 10000: # Small problem
return {
'timeLimit': 300,
'gapRel': 0.001,
'threads': 2,
}
elif n_vars < 100000: # Medium problem
return {
'timeLimit': 1800,
'gapRel': 0.01,
'threads': 4,
}
else: # Large problem
return {
'timeLimit': 3600,
'gapRel': 0.05,
'threads': 8,
'presolve': 1,
'heuristics': 1,
}
Common Pitfalls
- Memory issues: Large models can consume significant memory; use sparse data structures
- Timeout handling: Always set time limits and handle timeout results gracefully
- Integer infeasibility: Check for infeasible constraints before large solve attempts
- Missing abort checks: Long solves must periodically check for abort signals
- Transaction boundaries: Wrap solution application in atomic transactions
Verification
Test solver integration:
# In Django shell
from scheduler.solver.optimizer import get_optimizer, PuLPOptimizer
# Test PuLP
opt = PuLPOptimizer('test')
data = {
'matches': [1, 2, 3],
'days': [1, 2],
'slots': [1],
'costs': {(m, d, s): m * d for m in [1,2,3] for d in [1,2] for s in [1]},
}
opt.build_model(data)
result = opt.solve()
print(result.status, result.objective_value)