2026-02-04 15:49:25 +01:00

22 KiB

name description argument-hint allowed-tools
lp-solver Integrates PuLP/Xpress MIP solvers into league-planner with proper configuration, Celery task wrapping, progress callbacks, and result handling. Use for optimization tasks. <optimization-type> Read, Write, Edit, Glob, Grep

League-Planner Solver Integration

Integrates Mixed-Integer Programming (MIP) solvers (PuLP with CBC, or FICO Xpress) into the league-planner system following project patterns: solver configuration, Celery task wrapping, progress reporting, and graceful degradation.

When to Use

  • Creating new optimization models (scheduling, draws, assignments)
  • Integrating solver output with Django models
  • Implementing progress reporting for long-running optimizations
  • Configuring solver parameters for performance tuning

Prerequisites

  • PuLP installed: pip install pulp>=2.7
  • Xpress installed (optional): requires license and xpress package
  • Solver submodules cloned: scheduler/solver/, draws/solver/
  • Environment variable SOLVER set to pulp or xpress

Instructions

Step 1: Environment Configuration

# leagues/settings.py

import os

# Solver selection: 'pulp' (default, free) or 'xpress' (commercial, faster)
SOLVER = os.environ.get('SOLVER', 'pulp')

# Run mode: 'local' (synchronous) or 'celery' (async)
RUN_MODE = os.environ.get('RUN_MODE', 'local')

# Solver-specific settings
SOLVER_SETTINGS = {
    'pulp': {
        'solver': 'CBC',  # or 'GLPK', 'COIN_CMD'
        'msg': False,
        'timeLimit': 3600,  # 1 hour
        'gapRel': 0.01,  # 1% optimality gap
    },
    'xpress': {
        'maxtime': 3600,
        'miprelstop': 0.01,
        'threads': 4,
        'presolve': 1,
    },
}

Step 2: Create Solver Module

# scheduler/solver/optimizer.py

from __future__ import annotations
import logging
from typing import Callable, Any
from dataclasses import dataclass
from django.conf import settings

logger = logging.getLogger('custom')


@dataclass
class OptimizationResult:
    """Container for optimization results."""
    status: str  # 'optimal', 'feasible', 'infeasible', 'timeout', 'error'
    objective_value: float | None
    solution: dict[str, Any]
    solve_time: float
    gap: float | None
    iterations: int

    def is_success(self) -> bool:
        return self.status in ('optimal', 'feasible')

    def summary(self) -> dict:
        return {
            'status': self.status,
            'objective': self.objective_value,
            'solve_time': self.solve_time,
            'gap': self.gap,
        }


class BaseOptimizer:
    """Base class for optimization models."""

    def __init__(
        self,
        name: str,
        progress_callback: Callable[[int, str], None] | None = None,
        abort_check: Callable[[], bool] | None = None,
    ):
        self.name = name
        self.progress_callback = progress_callback or (lambda p, s: None)
        self.abort_check = abort_check or (lambda: False)
        self.model = None
        self.variables = {}
        self.constraints = []

    def report_progress(self, percent: int, message: str):
        """Report progress to callback."""
        self.progress_callback(percent, message)
        logger.info(f"[{self.name}] {percent}% - {message}")

    def check_abort(self) -> bool:
        """Check if optimization should be aborted."""
        return self.abort_check()

    def build_model(self, data: dict) -> None:
        """Build the optimization model. Override in subclass."""
        raise NotImplementedError

    def solve(self) -> OptimizationResult:
        """Solve the model. Override in subclass."""
        raise NotImplementedError

    def extract_solution(self) -> dict:
        """Extract solution from solved model. Override in subclass."""
        raise NotImplementedError


class PuLPOptimizer(BaseOptimizer):
    """Optimizer using PuLP with CBC solver."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        import pulp
        self.pulp = pulp

    def build_model(self, data: dict) -> None:
        """Build PuLP model from data."""
        self.report_progress(10, "Building optimization model...")

        # Create model
        self.model = self.pulp.LpProblem(self.name, self.pulp.LpMinimize)

        # Example: Binary variables for match assignments
        # x[i,j,k] = 1 if match i is assigned to day j, slot k
        matches = data.get('matches', [])
        days = data.get('days', [])
        slots = data.get('slots', [])

        self.variables['x'] = self.pulp.LpVariable.dicts(
            'x',
            ((m, d, s) for m in matches for d in days for s in slots),
            cat='Binary'
        )

        self.report_progress(30, "Adding constraints...")

        # Each match assigned exactly once
        for m in matches:
            self.model += (
                self.pulp.lpSum(
                    self.variables['x'][m, d, s]
                    for d in days for s in slots
                ) == 1,
                f"assign_match_{m}"
            )

        self.report_progress(50, "Setting objective function...")

        # Minimize total travel distance (example)
        self.model += self.pulp.lpSum(
            self.variables['x'][m, d, s] * data['costs'].get((m, d, s), 0)
            for m in matches for d in days for s in slots
        )

    def solve(self) -> OptimizationResult:
        """Solve the PuLP model."""
        import time

        self.report_progress(60, "Solving optimization model...")

        # Configure solver
        solver_settings = settings.SOLVER_SETTINGS.get('pulp', {})

        if solver_settings.get('solver') == 'CBC':
            solver = self.pulp.PULP_CBC_CMD(
                msg=solver_settings.get('msg', False),
                timeLimit=solver_settings.get('timeLimit', 3600),
                gapRel=solver_settings.get('gapRel', 0.01),
            )
        else:
            solver = None  # Use default

        start_time = time.time()

        try:
            status = self.model.solve(solver)
            solve_time = time.time() - start_time

            status_map = {
                self.pulp.LpStatusOptimal: 'optimal',
                self.pulp.LpStatusNotSolved: 'not_solved',
                self.pulp.LpStatusInfeasible: 'infeasible',
                self.pulp.LpStatusUnbounded: 'unbounded',
                self.pulp.LpStatusUndefined: 'undefined',
            }

            return OptimizationResult(
                status=status_map.get(status, 'unknown'),
                objective_value=self.pulp.value(self.model.objective),
                solution=self.extract_solution() if status == self.pulp.LpStatusOptimal else {},
                solve_time=solve_time,
                gap=None,  # CBC doesn't easily expose gap
                iterations=0,
            )

        except Exception as e:
            logger.error(f"Solver error: {e}")
            return OptimizationResult(
                status='error',
                objective_value=None,
                solution={},
                solve_time=time.time() - start_time,
                gap=None,
                iterations=0,
            )

    def extract_solution(self) -> dict:
        """Extract solution values from solved model."""
        solution = {}

        for var_name, var_dict in self.variables.items():
            solution[var_name] = {
                key: var.varValue
                for key, var in var_dict.items()
                if var.varValue is not None and var.varValue > 0.5
            }

        return solution


class XpressOptimizer(BaseOptimizer):
    """Optimizer using FICO Xpress solver."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        try:
            import xpress as xp
            self.xp = xp
        except ImportError:
            raise ImportError("Xpress solver not available. Install with: pip install xpress")

    def build_model(self, data: dict) -> None:
        """Build Xpress model from data."""
        self.report_progress(10, "Building Xpress model...")

        self.model = self.xp.problem(name=self.name)

        # Example variables
        matches = data.get('matches', [])
        days = data.get('days', [])
        slots = data.get('slots', [])

        # Create binary variables
        self.variables['x'] = {
            (m, d, s): self.xp.var(vartype=self.xp.binary, name=f'x_{m}_{d}_{s}')
            for m in matches for d in days for s in slots
        }
        self.model.addVariable(*self.variables['x'].values())

        self.report_progress(30, "Adding constraints...")

        # Each match assigned exactly once
        for m in matches:
            self.model.addConstraint(
                self.xp.Sum(self.variables['x'][m, d, s] for d in days for s in slots) == 1
            )

        self.report_progress(50, "Setting objective...")

        # Objective
        self.model.setObjective(
            self.xp.Sum(
                self.variables['x'][m, d, s] * data['costs'].get((m, d, s), 0)
                for m in matches for d in days for s in slots
            ),
            sense=self.xp.minimize
        )

    def solve(self) -> OptimizationResult:
        """Solve the Xpress model."""
        import time

        self.report_progress(60, "Solving with Xpress...")

        solver_settings = settings.SOLVER_SETTINGS.get('xpress', {})

        # Set controls
        self.model.controls.maxtime = solver_settings.get('maxtime', 3600)
        self.model.controls.miprelstop = solver_settings.get('miprelstop', 0.01)
        self.model.controls.threads = solver_settings.get('threads', 4)

        start_time = time.time()

        try:
            self.model.solve()
            solve_time = time.time() - start_time

            # Get solution status
            status_code = self.model.getProbStatus()
            status_map = {
                self.xp.mip_optimal: 'optimal',
                self.xp.mip_solution: 'feasible',
                self.xp.mip_infeas: 'infeasible',
                self.xp.mip_unbounded: 'unbounded',
            }

            return OptimizationResult(
                status=status_map.get(status_code, 'unknown'),
                objective_value=self.model.getObjVal() if status_code in (self.xp.mip_optimal, self.xp.mip_solution) else None,
                solution=self.extract_solution(),
                solve_time=solve_time,
                gap=self.model.getAttrib('miprelgap') if hasattr(self.model, 'getAttrib') else None,
                iterations=self.model.getAttrib('simplexiter') if hasattr(self.model, 'getAttrib') else 0,
            )

        except Exception as e:
            logger.error(f"Xpress error: {e}")
            return OptimizationResult(
                status='error',
                objective_value=None,
                solution={},
                solve_time=time.time() - start_time,
                gap=None,
                iterations=0,
            )

    def extract_solution(self) -> dict:
        """Extract solution from Xpress model."""
        solution = {}

        for var_name, var_dict in self.variables.items():
            solution[var_name] = {
                key: self.model.getSolution(var)
                for key, var in var_dict.items()
                if self.model.getSolution(var) > 0.5
            }

        return solution


def get_optimizer(name: str, **kwargs) -> BaseOptimizer:
    """Factory function to get appropriate optimizer based on settings."""
    solver = settings.SOLVER

    if solver == 'xpress':
        try:
            return XpressOptimizer(name, **kwargs)
        except ImportError:
            logger.warning("Xpress not available, falling back to PuLP")
            return PuLPOptimizer(name, **kwargs)
    else:
        return PuLPOptimizer(name, **kwargs)

Step 3: Wrap in Celery Task

# scheduler/solver/tasks.py

from celery import shared_task
from celery.contrib.abortable import AbortableTask
from django.db import transaction

from taskmanager.models import Task as TaskRecord


@shared_task(
    bind=True,
    name='scheduler.optimize_scenario',
    base=AbortableTask,
    time_limit=7200,  # 2 hours
    soft_time_limit=7000,
)
def task_optimize_scenario(
    self,
    scenario_id: int,
    user_id: int = None,
    options: dict = None,
) -> dict:
    """
    Run optimization for a scenario.

    Args:
        scenario_id: ID of scenario to optimize
        user_id: Optional user for notifications
        options: Solver options override

    Returns:
        dict with optimization results
    """
    from scheduler.models import Scenario, OptimizationRun
    from scheduler.solver.optimizer import get_optimizer, OptimizationResult

    options = options or {}

    # Create task tracking record
    task_record = TaskRecord.objects.create(
        task_id=self.request.id,
        task_name='scheduler.optimize_scenario',
        scenario_id=scenario_id,
        user_id=user_id,
        queue=self.request.delivery_info.get('routing_key', 'celery'),
    )

    def progress_callback(percent: int, message: str):
        """Update progress in both Celery and TaskRecord."""
        self.update_state(
            state='PROGRESS',
            meta={'progress': percent, 'status': message}
        )
        task_record.update_progress(percent, message)

    def abort_check() -> bool:
        """Check if task should abort."""
        return self.is_aborted()

    try:
        # Load scenario with related data
        scenario = Scenario.objects.select_related(
            'season', 'season__league'
        ).prefetch_related(
            'matches__home_team',
            'matches__away_team',
            'days',
            'kick_off_times',
        ).get(pk=scenario_id)

        progress_callback(5, 'Preparing optimization data...')

        # Prepare data for solver
        data = prepare_optimization_data(scenario, options)

        if abort_check():
            return {'status': 'aborted', 'scenario_id': scenario_id}

        # Create optimizer
        optimizer = get_optimizer(
            name=f'scenario_{scenario_id}',
            progress_callback=progress_callback,
            abort_check=abort_check,
        )

        # Build and solve
        optimizer.build_model(data)

        if abort_check():
            return {'status': 'aborted', 'scenario_id': scenario_id}

        result = optimizer.solve()

        progress_callback(80, 'Processing results...')

        if abort_check():
            return {'status': 'aborted', 'scenario_id': scenario_id}

        # Save results if successful
        if result.is_success():
            with transaction.atomic():
                apply_solution_to_scenario(scenario, result.solution)

                # Create optimization run record
                OptimizationRun.objects.create(
                    scenario=scenario,
                    status=result.status,
                    objective_value=result.objective_value,
                    solve_time=result.solve_time,
                    gap=result.gap,
                    settings=options,
                )

        progress_callback(100, 'Complete')
        task_record.mark_completed()

        return {
            'status': result.status,
            'scenario_id': scenario_id,
            'objective': result.objective_value,
            'solve_time': result.solve_time,
            'gap': result.gap,
        }

    except Exception as e:
        import traceback
        task_record.update_progress(-1, f'Error: {str(e)}')
        return {
            'status': 'error',
            'scenario_id': scenario_id,
            'error': str(e),
            'traceback': traceback.format_exc(),
        }


def prepare_optimization_data(scenario, options: dict) -> dict:
    """Prepare data dictionary for solver."""
    matches = list(scenario.matches.select_related('home_team', 'away_team'))
    days = list(scenario.days.all())
    slots = list(scenario.kick_off_times.all())

    # Calculate costs (distances, preferences, etc.)
    costs = {}
    for match in matches:
        for day in days:
            for slot in slots:
                costs[(match.id, day.id, slot.id)] = calculate_cost(
                    match, day, slot, options
                )

    return {
        'matches': [m.id for m in matches],
        'days': [d.id for d in days],
        'slots': [s.id for s in slots],
        'costs': costs,
        'match_data': {m.id: m for m in matches},
        'options': options,
    }


def calculate_cost(match, day, slot, options) -> float:
    """Calculate assignment cost for a match-day-slot combination."""
    cost = 0.0

    # Distance component
    if options.get('weight_distance', 1.0) > 0:
        from common.functions import dist
        distance = dist(match.home_team, match.away_team)
        cost += options.get('weight_distance', 1.0) * distance

    # Preference component
    if hasattr(match, 'preferred_day') and match.preferred_day:
        if day.id != match.preferred_day_id:
            cost += options.get('preference_penalty', 100.0)

    return cost


def apply_solution_to_scenario(scenario, solution: dict):
    """Apply optimization solution to scenario matches."""
    from scheduler.models import Match

    x_values = solution.get('x', {})

    # Batch update matches
    updates = []
    for (match_id, day_id, slot_id), value in x_values.items():
        if value > 0.5:
            updates.append((match_id, day_id, slot_id))

    for match_id, day_id, slot_id in updates:
        Match.objects.filter(pk=match_id).update(
            day_id=day_id,
            kick_off_time_id=slot_id,
        )

Step 4: Trigger from Views

# scheduler/views_func.py

from django.http import JsonResponse
from django.conf import settings
from common.decorators import crud_decorator


@crud_decorator(require_edit=True)
def start_optimization(request, scenario_id: int):
    """Start optimization for a scenario."""
    from scheduler.models import Scenario
    from scheduler.solver.tasks import task_optimize_scenario

    scenario = Scenario.objects.get(pk=scenario_id)

    # Check if optimization is already running
    from taskmanager.models import Task as TaskRecord
    running = TaskRecord.objects.filter(
        scenario_id=scenario_id,
        task_name='scheduler.optimize_scenario',
        completed_at__isnull=True,
    ).exists()

    if running:
        return JsonResponse({
            'status': 'error',
            'message': 'Optimization already running for this scenario',
        }, status=400)

    # Get options from request
    options = {
        'weight_distance': float(request.POST.get('weight_distance', 1.0)),
        'weight_fairness': float(request.POST.get('weight_fairness', 1.0)),
        'time_limit': int(request.POST.get('time_limit', 3600)),
    }

    # Start task based on run mode
    if settings.RUN_MODE == 'celery':
        result = task_optimize_scenario.delay(
            scenario_id=scenario.pk,
            user_id=request.user.pk,
            options=options,
        )
        return JsonResponse({
            'status': 'started',
            'task_id': result.id,
            'message': 'Optimization started in background',
        })
    else:
        # Synchronous execution
        result = task_optimize_scenario(
            scenario_id=scenario.pk,
            user_id=request.user.pk,
            options=options,
        )
        return JsonResponse({
            'status': result.get('status'),
            'result': result,
        })

Patterns & Best Practices

Graceful Degradation

# scheduler/solver/__init__.py

def get_task_optimize():
    """Get optimization task with graceful fallback."""
    try:
        from scheduler.solver.tasks import task_optimize_scenario
        return task_optimize_scenario
    except ImportError as e:
        import logging
        logging.warning(f"Solver module not available: {e}")

        # Return dummy task
        def dummy_task(*args, **kwargs):
            return {'status': 'error', 'message': 'Solver not configured'}
        return dummy_task


task_optimize = get_task_optimize()

Progress Callback Pattern

def create_progress_reporter(task, task_record):
    """Create a progress reporter function for the optimizer."""
    def report(percent: int, message: str):
        # Update Celery state
        task.update_state(
            state='PROGRESS',
            meta={
                'progress': percent,
                'status': message,
                'timestamp': timezone.now().isoformat(),
            }
        )

        # Update database record
        task_record.update_progress(percent, message)

        # Log for monitoring
        import logging
        logging.info(f"[{task.request.id}] {percent}% - {message}")

    return report

Solver Parameter Tuning

# Adjust parameters based on problem size
def get_solver_params(data: dict) -> dict:
    """Get solver parameters based on problem size."""
    n_matches = len(data['matches'])
    n_days = len(data['days'])
    n_slots = len(data['slots'])
    n_vars = n_matches * n_days * n_slots

    if n_vars < 10000:  # Small problem
        return {
            'timeLimit': 300,
            'gapRel': 0.001,
            'threads': 2,
        }
    elif n_vars < 100000:  # Medium problem
        return {
            'timeLimit': 1800,
            'gapRel': 0.01,
            'threads': 4,
        }
    else:  # Large problem
        return {
            'timeLimit': 3600,
            'gapRel': 0.05,
            'threads': 8,
            'presolve': 1,
            'heuristics': 1,
        }

Common Pitfalls

  • Memory issues: Large models can consume significant memory; use sparse data structures
  • Timeout handling: Always set time limits and handle timeout results gracefully
  • Integer infeasibility: Check for infeasible constraints before large solve attempts
  • Missing abort checks: Long solves must periodically check for abort signals
  • Transaction boundaries: Wrap solution application in atomic transactions

Verification

Test solver integration:

# In Django shell
from scheduler.solver.optimizer import get_optimizer, PuLPOptimizer

# Test PuLP
opt = PuLPOptimizer('test')
data = {
    'matches': [1, 2, 3],
    'days': [1, 2],
    'slots': [1],
    'costs': {(m, d, s): m * d for m in [1,2,3] for d in [1,2] for s in [1]},
}
opt.build_model(data)
result = opt.solve()
print(result.status, result.objective_value)