Source code for docplex.cp.solver.solver_listener

# --------------------------------------------------------------------------
# Source file provided under Apache License, Version 2.0, January 2004,
# http://www.apache.org/licenses/
# (c) Copyright IBM Corp. 2017 - 2022
# --------------------------------------------------------------------------
# Author: Olivier OUDOT, IBM Analytics, France Lab, Sophia-Antipolis

"""
This module defines the class :class:`CpoSolverListener` that allows to be warned about different steps of the solve.

Any number of listeners can be added to a solver using the method :meth:`docplex.cp.solver.solver.CpoSolver.add_listener`.
Listeners can also be added on the model object using :meth:`docplex.cp.model.CpoModel.add_listener`

This module also defines some default useful listeners:

 * :class:`AutoStopListener`: Listener that stops the solve if configurable conditions are reached.
 * :class:`DelayListener`: Utility listener that waits some time at each solution found.
 * :class:`SolverProgressPanelListener`: implements a progress panel that appears when the solve is started.
   This panel is based on the package *Tkinter* that is available only in Python 2.7.14 and Python 3.X.

All these listeners are effective if the model provides multiple solutions. They are then more adapted to
optimization problems.

To be able to process multiple solutions, the model should be solved:

 * using the solution iterator given by method :meth:`docplex.cp.model.CpoModel.start_search`,
 * or using the method :meth:`docplex.cp.model.CpoModel.solve`
   but setting the parameter *context.solver.solve_with_search_next* to True.

*New in version 2.8.*

Detailed description
--------------------
"""

from docplex.cp.solution import CpoSolveResult, CpoRefineConflictResult

# List of possible operations
OPERATION_SOLVE           = 'Solve'
OPERATION_PROPAGATE       = 'Propagate'
OPERATION_REFINE_CONFLICT = 'RefineConflict'
OPERATION_RUN_SEEDS       = 'RunSeeds'


###############################################################################
## Listener class
###############################################################################

[docs]class CpoSolverListener(object): """ Solve listener to be warned about different solving steps. This class is an 'abstract' class that must be extended by actual listener implementation. It implements methods that are called to be warned about noticeable solving steps. These methods are all taking the source solver as first argument. There are: * :meth:`solver_created` signals the solver has just been created, * :meth:`start_operation` signals the start of a solve operation, * :meth:`end_operation` signals the end of a solve operation, * :meth:`new_result` notifies about a new result, * :meth:`new_log_data` notifies about a new piece of log data. For a more specialized notification, the following methods are also implemented (here empty), called by the default implementation of the methods listed above (except *solver_created*). This means that, if the methods above are overwritten, the following methods may never be called, except if the new implementation calls for super(). * :meth:`start_solve` signals that a solve operation is started, * :meth:`end_solve` signals that a solve operation is ended, * :meth:`result_found` notifies a new solve result, * :meth:`start_refine_conflict` signals that a refine_conflict operation is started, * :meth:`end_refine_conflict` signals that a refine_conflict operation is ended, * :meth:`conflict_found` notifies about the result of a refine_conflict. """
[docs] def solver_created(self, solver): """ Notify the listener that the solver object has been created. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) """ pass
[docs] def start_operation(self, solver, op): """ Notify that an operation is started. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) op: Operation that is started, in constants OPERATION_* """ # For compatibility reasons, default implementation calls legacy functions if op is OPERATION_SOLVE: self.start_solve(solver) elif op is OPERATION_REFINE_CONFLICT: self.start_refine_conflict(solver)
[docs] def end_operation(self, solver, op): """ Notify that an operation is terminated. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) op: Operation that is terminated, in constants OPERATION_* """ # For compatibility reasons, default implementation calls legacy functions if op is OPERATION_SOLVE: self.end_solve(solver) elif op is OPERATION_REFINE_CONFLICT: self.end_refine_conflict(solver)
[docs] def new_result(self, solver, result): """ Signal that a new result has been found. Depending on the operation in progress, result object may be: * an object of class :class:`~docplex.cp.solution.CpoSolveResult` if operation is a solve, * an object of class :class:`~docplex.cp.solution.CpoRefineConflictResult` if the operation is a refine_conflict. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) result: New result that has been found """ # For compatibility reasons, default implementation calls legacy functions if isinstance(result, CpoSolveResult): self.result_found(solver, result) elif isinstance(result, CpoRefineConflictResult): self.conflict_found(solver, result)
[docs] def new_log_data(self, solver, data): """ Signal a new piece of log data. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) data: New log data as a string """ pass
[docs] def start_solve(self, solver): """ Notify that the solve is started. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) """ pass
[docs] def end_solve(self, solver): """ Notify that the solve is ended. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) """ pass
[docs] def result_found(self, solver, sres): """ Signal that a solve result has been found. This method is called every time a result is provided by the solver. The result, in particular the last one, may not contain any solution. This should be checked calling method sres.is_solution(). This method replaces deprecated method solution_found() that is confusing as result may possibly not contain a solution to the model. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) sres: Solve result, object of class :class:`~docplex.cp.solution.CpoSolveResult` """ pass
[docs] def start_refine_conflict(self, solver): """ Notify that the refine conflict is started. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) """ pass
[docs] def end_refine_conflict(self, solver): """ Notify that the refine conflict is ended. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) """ pass
[docs] def conflict_found(self, solver, cflct): """ Signal that a conflict result has been found. This method is called when a conflict result is found by the solver when method refine_conflict() is called. Note that the conflict result may contain no conflict. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) cflct: Conflict descriptor, object of class :class:`~docplex.cp.solution.CpoRefineConflictResult` """ pass
############################################################################### ## Solver listenet that just log events. ###############################################################################
[docs]class LogSolverListener(CpoSolverListener): """ Solve listener that just log listener events. *New in version 2.8.* """ def __init__(self, prefix=""): super(LogSolverListener, self).__init__() self.prefix = prefix
[docs] def solver_created(self, solver): """ Notify the listener that the solver object has been created. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) """ print(str(self.prefix) + "Solver created")
[docs] def start_operation(self, solver, op): """ Notify that an operation is started. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) op: Operation that is started, in constants OPERATION_* """ print(str(self.prefix) + "Operation '{}' started".format(op))
[docs] def end_operation(self, solver, op): """ Notify that an operation is terminated. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) op: Operation that is terminated, in constants OPERATION_* """ print(str(self.prefix) + "Operation '{}' terminated".format(op))
[docs] def new_result(self, solver, result): """ Signal that a new result has been found. Args: solver: Originator CPO solver (object of class :class:`~docplex.cp.solver.solver.CpoSolver`) result: New result that has been found """ print(str(self.prefix) + "New '{}' found".format(type(result).__name__))
############################################################################### ## Implementation of a solve listener that stops search when no new solution ## is given during a given amount of time ############################################################################### from threading import Condition
[docs]class AutoStopListener(CpoSolverListener): """ Solver listener that aborts a search when a predefined criteria is reached. *New in version 2.8.* """ def __init__(self, qsc_time=None, qsc_sols=None, min_sols=0, max_sols=None): """ **Constructor** Create a new solver listener that aborts the solve if defined criteria are reached. Args: min_sols: Minimun number of solutions to be found before stopping the solve max_sols: Maximum number of solutions after which solve is stopped qsc_time: Quiesce time limit. Max time, in seconds, after which solver is stopped if no new solution is found. qsc_sols: Quiesce time limit expressed as a number of solutions. Actual quiesce time limit is computed as this value multiplied by the average time between solutions. """ # Initialize next abort time self.min_sols = min_sols self.max_sols = max_sols self.qsc_time = qsc_time self.qsc_sols = qsc_sols self.nb_sols = 0 # Current number of solutions found self.start_time = time.time() self.last_sol_time = None # Time of the last solution # Set time checking active indicator self.time_active = self.qsc_time is not None or self.qsc_sols is not None self.condition = Condition()
[docs] def start_solve(self, solver): # Store solver self.solver = solver # Check if a time limit is defined if self.time_active: self.abort_time = self._compute_next_abort_time() # Start time waiting thread self.time_active = True thread = Thread(target=self._waiting_loop) thread.start()
[docs] def end_solve(self, solver): # Stop time control thread if any self._stop_waiting_loop()
[docs] def result_found(self, solver, msol): self.nb_sols += 1 self.last_sol_time = time.time() # Check if minimal number of solutions not reached if self.nb_sols < self.min_sols: return # Check if max number of solutions has been reached if self.max_sols and self.nb_sols >= self.max_sols: # Abort search self._stop_waiting_loop() self.solver.abort_search() return # Update next abort time if self.time_active: self.abort_time = self._compute_next_abort_time() with self.condition: self.condition.notify()
def _compute_next_abort_time(self): if self.nb_sols < self.min_sols: return None if self.qsc_sols is not None: if self.nb_sols > 0: delay = self.qsc_sols * ((time.time() - self.start_time) / self.nb_sols) if self.qsc_time is not None: delay = min(delay, self.qsc_time) return time.time() + delay else: if self.qsc_time is not None: return time.time() + self.qsc_time return None if self.qsc_time is not None: return time.time() + self.qsc_time return None def _waiting_loop(self): """ Timer thread body """ abort_search = False self.condition.acquire() while self.time_active: if self.abort_time is None: self.condition.wait() else: ctime = time.time() if self.abort_time <= ctime: self.time_active = False abort_search = True else: self.condition.wait(self.abort_time - ctime) self.condition.release() if abort_search: self.solver.abort_search() def _stop_waiting_loop(self): # Stop time control thread if any self.condition.acquire() self.time_active = False self.condition.notify() self.condition.release()
############################################################################### ## Implementation of a solve listener that slow down solutions flow by waiting ## a given delay. ###############################################################################
[docs]class DelayListener(CpoSolverListener): """ Solver listener that waits a given delay after each solution. *New in version 2.8.* """ def __init__(self, delay): """ **Constructor** Create a new solver listener that waits a given delay after each solution. Args: delay: Wait delay in seconds """ self.delay = delay
[docs] def result_found(self, solver, msol): time.sleep(self.delay)
############################################################################### ## Implementation of a solve progress panel based on Tkinter ############################################################################### from threading import Thread import time from docplex.cp.utils import CpoNotSupportedException _UNIT_MULTIPLIER = {'k': 1000, 'K': 1000, 'M': 1000000, 'm': 1000000, 'G': 1000000000, 'g': 1000000000, 'b': 1, 'B': 1} TK_INTER_AVAILABLE = True try: # Python 2 version import Tkinter as TK import tkMessageBox as messagebox except: try: # Python 3 version import tkinter as TK from tkinter import messagebox as messagebox except: TK_INTER_AVAILABLE = False if not TK_INTER_AVAILABLE: class SolverProgressPanelListener(CpoSolverListener): def __init__(self, parse_log=False): raise CpoNotSupportedException("Tkinter is not available in this Python environment") else: class _SolverInfoPanel: """ Solver info panel that display objective value(s) and KPIs. *New in version 2.8.* """ def __init__(self, master, model, stopfun): """**Constructor** Create a new solver info panel. Args: master: Master TK component model: Model to be solved stopfun: Function to be called if stop is requested by user """ # Store attributes self.master = master self.model = model self.stopfun = stopfun self.start_time = time.time() # Set window title master.title("CPO solver infos") # Add name of the model self.model_label = TK.Label(master, text="Solving '" + self.model.get_name() + "'", font="Helvetica 10 bold") self.model_label.grid(row=0, column=0, columnspan=2) row = 1 # Add solve time TK.Label(master, text="Run time:").grid(row=row, column=0, sticky=TK.E) self.time_text = TK.StringVar() self.time_text.set("00:00:00") TK.Label(master, textvariable=self.time_text, bg='white', width=10, anchor=TK.E)\ .grid(row=row, column=1, sticky=TK.E) row += 1 master.after(1000, self._update_time) # Add objective value if any if model.get_optimization_expression() is None: self.objective_value_text = None else: TK.Label(master, text="Objective value:").grid(row=row, column=0, sticky=TK.E) self.objective_value_text = TK.StringVar() self.objective_value_text.set("unknown") TK.Label(master, textvariable=self.objective_value_text, bg='white', width=10, anchor=TK.E)\ .grid(row=row, column=1, sticky=TK.E) row += 1 # Add KPIs if any kpis = model.get_kpis() self.kpi_value_texts = [] for k in sorted(kpis.keys()): TK.Label(master, text=k + ":").grid(row=row, column=0, sticky=TK.E) txt = TK.StringVar() txt.set("unknown") self.kpi_value_texts.append(txt) TK.Label(master, textvariable=txt, bg='white', width=10, anchor=TK.E)\ .grid(row=row, column=1, sticky=TK.E) row += 1 # Add stop button self.stop_button = TK.Button(master, text="Stop solve", command=self.stopfun) self.stop_button.grid(row=row, column=0, columnspan=2) # Initialize processing of possible additional infos self.infos_texts = {} # Dictionary of info texts per name self.last_row = row def notify_solution(self, msol): """ Notify progress panel about a new solution Args: msol: New model solution """ if msol: # Update objective value if self.objective_value_text: self.objective_value_text.set(', '.join(str(v) for v in msol.get_objective_values())) # Update KPIs kpis = msol.get_kpis() for i, k in enumerate(sorted(kpis.keys())): self.kpi_value_texts[i].set(str(kpis[k])) def notify_infos(self, infos): """ Notify progress panel about last infos values Args: infos: Information dictionary """ #print("Current infos: {}".format(infos)) for k, v in infos.items(): if k in self.infos_texts: txt = self.infos_texts[k] else: # Create a new label for info TK.Label(self.master, text=k + ":").grid(row=self.last_row, column=0, sticky=TK.E) txt = TK.StringVar() self.infos_texts[k] = txt TK.Label(self.master, textvariable=txt, bg='white', width=10, anchor=TK.E)\ .grid(row=self.last_row, column=1, sticky=TK.E) self.last_row += 1 self.stop_button.grid(row=self.last_row, column=0, columnspan=2) txt.set(str(v)) def _update_time(self): """ Update value of time field """ dur = int(time.time() - self.start_time) hours = dur // 3600 mins = (dur //60) % 60 secs = dur % 60 self.time_text.set("{:02d}:{:02d}:{:02d}".format(hours, mins, secs)) self.master.after(1000, self._update_time) class _CpoSolverProgressPanel(Thread): """ Main class of the solve progress panel. It is implemented in a separate thread to be able to run in parallel from the model solving. """ def __init__(self, solver): """ Create a new solver progress panel Args: solver: Solver to track stopfun: Function to be called if stop is requested by user """ super(_CpoSolverProgressPanel, self).__init__() self.solver = solver self.model = solver.get_model() self.active = True self.display_panel = None self.initdone = False self.last_solution = None self.last_infos = {} self.info_updates = {} def run(self): """ Thread main loop """ # Create GUI components self.root = TK.Tk() self.display_panel = _SolverInfoPanel(self.root, self.model, self._stop_requested_by_user) self.root.after_idle(self._poll_updates) self.root.protocol("WM_DELETE_WINDOW", self.on_closing) self.initdone = True # Start mainloop self.root.mainloop() def notify_solution(self, msol): """ Notify progress panel about a new solution Args: msol: New model solution """ self.last_solution = msol def notify_infos(self, infos): """ Notify miscellaneous information given in a dictionary Args: infos: Information dictionary """ for k, v in infos.items(): ov = self.last_infos.get(k) if ov != v: self.last_infos[k] = v self.info_updates[k] = v def _stop_requested_by_user(self): """ Notify a stop requested by end user through GUI """ self.active = False self.solver.abort_search() def start(self): """ Start the thread and wait for init completed """ super(_CpoSolverProgressPanel, self).start() while not self.initdone: time.sleep(0.01) def stop(self): """ Request progress panel to stop and wait for thread termination """ self.active = False self.join() def on_closing(self): """ Procedure called when closing the window """ if messagebox.askokcancel("Quit", "Do you really want to terminate solving of model '{}' ?".format(self.model.get_name())): self._stop_requested_by_user() def _poll_updates(self): """ Polling of external changes by the TK mainloop """ # Check stop requested if not self.active: self.root.quit() return # Check new solution sol = self.last_solution if sol is not None: self.last_solution = None self.display_panel.notify_solution(sol) ifoupd = self.info_updates if ifoupd: self.info_updates = {} self.display_panel.notify_infos(ifoupd) # Rearm callback self.root.after(1000, self._poll_updates)
[docs] class SolverProgressPanelListener(CpoSolverListener): """ Solver listener that display a solving progress panel. The solver progress panel displays the following information: * the time elapsed since the beginning of the solve, * the last known objective (if any) * the last known values of the KPIs (if any) * a *Stop solve* button allowing to stop the solve and keep the last known solution as model solution. * if parse_log indicator is set, information taken from the log: memory usage, bounds, etc *New in version 2.8.* Args: parse_log(optional): Enable log parsing to retrieve additional information such as memory, bounds, etc """ def __init__(self, parse_log=False): super(SolverProgressPanelListener, self).__init__() self.parse_log = parse_log def solver_created(self, solver): # Force solve with search_next solver.set_solve_with_search_next(True) def start_solve(self, solver): """ Notify that the solve is started. """ # Create progress panel self.progress_panel = _CpoSolverProgressPanel(solver) self.progress_panel.start() def end_solve(self, solver): """ Notify that the solve is ended. """ self.progress_panel.stop() def result_found(self, solver, msol): """ Signal that a solution has been found. """ self.progress_panel.notify_solution(msol) def new_log_data(self, solver, data): """ Signal a new piece of log data. """ if not self.parse_log: return # Search for solve infos for line in data.splitlines(): if line.startswith(" ! Time = "): infos = {} ex = line.find("s,", 10) if ex > 0: infos['SolveTime'] = float(line[10:ex]) sx = line.find("Average fail depth = ", ex) if sx >= 0: sx += 21 ex = line.find(", ", sx) if ex > 0: infos['Average fail depth'] = int(line[sx:ex]) sx = line.find("Memory usage = ", ex) if sx >= 0: sx += 15 ex = line.find(" ", sx) if ex > 0: #infos['Memory usage'] = int(float(line[sx:ex]) * _UNIT_MULTIPLIER[line[ex+1]]) infos['Memory usage'] = line[sx:] self.progress_panel.notify_infos(infos) elif line.startswith(" ! Current bound is "): infos = {} ex = line.find(" (", 20) if ex > 0: #infos['Current bound'] = [int(x) for x in line[20:ex].split(";")] infos['Current bound'] = line[20:ex] self.progress_panel.notify_infos(infos) elif line.startswith(" ! Current objective is "): infos = {'Current objective': line[24:]} #infos['Current objective'] = [int(x) for x in line[24:].split(";")] self.progress_panel.notify_infos(infos)