Source code for docplex.mp.progress

# --------------------------------------------------------------------------
# Source file provided under Apache License, Version 2.0, January 2004,
# http://www.apache.org/licenses/
# (c) Copyright IBM Corp. 2015, 2019
# --------------------------------------------------------------------------
'''This package contains classes to monitor the progress of a MIP solve.

This abstract class defines protocol methods, which are called from within CPLEX's MIP search algorithm.

Progress listeners are based on CPLEX's MIPInfoCallback, and work only with a
local installation of CPLEX, not on Cplexcloud.

At each node, all progress listeners attached to the model receive progress data
through a `notify_progress` method, with an instance of :class:`~ProgressData`.
This named tuple class contains information about solve time, objective value,
best bound, gap and nodes, but no variable values.

The base class for progress listeners is :class:`~ProgressListener`.
DOcplex provides concrete sub-classes to handle basic cases, but
you may also create your own sub-class of :class:`~ProgressListener` to suit your needs.
Each instance of :class:`~ProgressListener` must be attached to a model to receive events during
a MIP solve., using `Model.add_progress_listener`.

An example of such a progress listener is :class:`TextProgressListener`,
which prints a message to stdout in a format similar to CPLEX log, each time it receives a progress data.

If you need to get the values of variables in an intermediate solution,
you should sub-class from class :class:`~SolutionListener`.
In this case, the :func:`~ProgressListener.notify_solution` method will be called
each time CPLEX finds a valid intermediate solution, with an instance of :class:`docplex.mp.solution.SolveSolution`.

Listeners are called from within CPLEX code, with its own frequency. It may occur that listeners are
called with no real progress in either the objective or the best bound. To help you chose what kind
of calls you are really interested in, DOcplex uses the enumerated class :class:`~ProgressClock'.
The baseline clock is the CPLEX clock, but you can refine this clock: for example,
`ProgressClock.Objective` filters all calls but those where the objective has improved.

To summarize, listeners are called from within CPLEX's MIP search algorithm, when the clock
you have chosen accepts the call. All listeners are created with a clock argument that controls which
calls they accept.

When a listener accepts a call, its method :func:`~ProgressListener.notify_progress` is called with the current progress data.
See :class:`~TextProgressListener` as an example of a simple progress listener.

In addition, if the listener derives from :class:`~SolutionListener`, its method :func:`~ProgressListener.notify_solution` is also called
with the intermediate solution, passed as an instance of :class:`docplex.mp.solution.SolveSolution`.
See :class:`~SolutionRecorder` as an example of solution listener

'''
from enum import Enum
from collections import namedtuple


from docplex.mp.utils import is_string, is_number
from docplex.mp.error_handler import docplex_fatal

_TProgressData_ = namedtuple('_TProgressData',
                             ['id', 'has_incumbent',
                              'current_objective', 'best_bound', 'mip_gap',
                              'current_nb_iterations', 'current_nb_nodes', 'remaining_nb_nodes',
                              'time', 'det_time'])


# noinspection PyUnresolvedReferences
[docs]class ProgressData(_TProgressData_): """ A named tuple class to hold progress data, as reeived from CPLEX. Attributes: has_incumbent: a boolean, indicating whether an incumbent solution is available (or not), at the moment the listener is called. current_objective: contains the current objective, if an incumbent is available, else None. best_bound: The current best bound as reported by the solver. mip_gap: the gap between the best integer objective and the objective of the best node remaining; available if an incumbent is available. current_nb_nodes: the current number of nodes. current_nb_iterations: the current number of iterations. remaining_nb_nodes: the remaining number of nodes. time: the elapsed time since solve started. det_time: the deterministic time since solve started. """ def __str__(self): # pragma: no cover fmt = 'ProgressData({0}, {1}, obj={2}, bbound={3}, #nodes={4})'. \ format(self.id, self.has_incumbent, self.current_objective, self.best_bound, self.current_nb_nodes) return fmt
[docs]class ProgressClock(Enum): """ This enumerated class controls the type of events a listener listens to. The possible values are (in order of decreasing call frequency). - All: the listener listens to all calls from CPLEX. - BestBound: listen to changes in best bound, not necessarily with a solution present. - Solutions: listen to all intermediate solutions, not necessarily impriving. Nothing prevents being called several times with an identical solution. - Gap: listen to intermediate solutions, where either objective or best bound has improved. - Objective: listen to intermediate solutions, where the solution objective has improved. To determine whether objective or best bound have improved (or not), see the numerical parameters `absdiff` and `reldiff` in `:class:ProgressListener` and all its descendant classes. *New in version 2.10* """ # # An enumeration of filtering levels for listeners. # All = 0 Solutions = 1 BestBound = 2 # best bound clock is indepedent from solution clock Objective = 5 # b101 Gap = 7 # the gap clock changes when there is a solution, and either bound or objective has changed @property def listens_to_solution(self): # returns true if the enum listens to solutions return 1 == self.value & 1 @classmethod def parse(cls, arg): if isinstance(arg, ProgressClock): return arg else: # int value for fl in cls: if arg == fl.value: return fl elif is_string(arg) and arg.lower() == fl.name.lower(): return fl else: # pragma: no cover raise ValueError('Expecting filter level, {0!r} was passed'.format(arg))
default_absdiff = 1e-1 default_reldiff = 1e-2 class _AbstractProgressListener(object): def __init__(self): self._cb = None self._current_progress_data = None def _get_model(self): ccb = self._cb return ccb._model if ccb else None def _set_current_progress_data(self, pdata): # INTERNAL self._current_progress_data = pdata def _disconnect(self): # INTERNAL self._cb = None def _connect_cb(self, cb): # INTERNAL self._cb = cb def abort(self): ccb = self._cb if ccb is not None: ccb.abort() else: print('!!! callback is not connected - abort() ignored') def notify_start(self): self._current_progress_data = None
[docs]class ProgressListener(_AbstractProgressListener): ''' The base class for progress listeners. ''' def __init__(self, clock_arg=ProgressClock.All, absdiff=None, reldiff=None): super().__init__() clock = ProgressClock.parse(clock_arg) self._clock = clock self._absdiff = absdiff if absdiff is not None else default_absdiff self._reldiff = reldiff if reldiff is not None else default_reldiff self._filter = make_clock_filter(clock, absdiff, reldiff) @property def clock(self): """ Returns the clock of the listener. :return: an instance of :class:`~ProgressClock` """ return self._clock @property def abs_diff(self): return self._absdiff @property def relative_diff(self): return self._reldiff @property def current_progress_data(self): """ This property return the current progress data, if any. Returns the latest progress data, if at least one has been received. :return: an instance of :class:~ProgressData` or None. """ return self._current_progress_data def accept(self, pdata): return self._filter.accept(pdata) def requires_solution(self): return False
[docs] def notify_solution(self, s): """ Redefine this method to handle an intermediate solution from the callback. Args: s: solution Note: if you need to access runtime information (time, gap, number of nodes), use :func:`SolutionListener.current_progress_data`, which contains the latest priogress information as a tuple. """ pass # pragma: no cover
[docs] def notify_start(self): """ This methods is called on all attached listeners at the beginning of a solve(). When writing a custom listener, this method is used to restore internal data which can be modified during solve. Always call the superclass `notify_start` before adding specific code in a sub-class. """ super().notify_start() self._filter.reset()
def notify_jobid(self, jobid): pass # pragma: no cover
[docs] def notify_end(self, status, objective): """The method called when solve is finished on a model. The status is the solve status from the solve() method """ pass # pragma: no cover
[docs] def notify_progress(self, progress_data): """ This method is called from within the solve with a ProgressData instance. :param progress_data: an instance of :class:`ProgressData` containing data about the current point in the search tree. """ pass # pragma: no cover
[docs] def abort(self): ''' Aborts the CPLEX search. This method tells CPLEX to stop the MIP search. You may use this method in a custom progress listener to stop the search based on your own criteria (for example, when improvements in gap is consistently below a minimum threshold). Note: This method should be called from within a :func:`notify_progress` method call, otherwise it will have no effect. ''' super().abort()
# noinspection PyUnusedLocal,PyMethodMayBeStatic class FilterAcceptAll(object): def accept(self, pdata): return True def reset(self): pass # noinspection PyMethodMayBeStatic class FilterAcceptAllSolutions(object): def accept(self, pdata): return pdata.has_incumbent def reset(self): pass class Watcher(object): def __init__(self, name, absdiff, reldiff, update_fn): assert absdiff >= 0 assert reldiff >= 0 self.name = name self._watched = None self._old = None self._absdiff = absdiff self._reldiff = reldiff self._update_fn = update_fn def reset(self): self._watched = None self._old = None def accept(self, progress_data): accepted = False old = self._watched new_watched = self._update_fn(progress_data) # if new_watched is None, then it is not available (e.g. objective) if new_watched is not None: if old is None: accepted = True else: # assert is a number delta = abs(new_watched - old) reldiff = self._reldiff absdiff = self._absdiff if 0 < absdiff <= abs(new_watched - old): accepted = True elif reldiff and delta / (1 + abs(old)) >= reldiff: accepted = True return accepted def sync(self, pdata): cur = self._watched self._watched = self._update_fn(pdata) self._old = cur def __str__(self): # pragma: no cover ws = '--' if self._watched is None else self._watched return 'W_{0}[{1}]'.format(self.name, ws) def clock_filter_accept_stop_here(watcher, pdata): pass class ClockFilter(object): def __init__(self, level, obj_absdiff, bbound_absdiff, obj_reldiff=0, bbound_reldiff=0, node_delta=-1): watchers = [] if obj_absdiff > 0 or obj_reldiff > 0: def update_obj(pdata): return pdata.current_objective if pdata.has_incumbent else None obj_watcher = Watcher(name='obj', absdiff=obj_absdiff, reldiff=obj_reldiff, update_fn=update_obj) watchers.append(obj_watcher) if bbound_absdiff > 0 or bbound_reldiff > 0: def update_bbound(pdata): return pdata.best_bound watchers.append(Watcher(name='gap_bound', absdiff=bbound_absdiff, reldiff=bbound_reldiff, update_fn=update_bbound)) if node_delta > 0: used_node_delta = max(node_delta, 1) def update_nodes(pdata): return pdata.current_nb_nodes watchers.append(Watcher(name='nodes', absdiff=used_node_delta, reldiff=0, update_fn=update_nodes)) self._watchers = watchers self._clock = level self._listens_to_solution = level.listens_to_solution def accept(self, progress_data): if not progress_data.has_incumbent and self._listens_to_solution: return False # the filter accepts data as soon as one of its cells accepts it. poke = self.peek(progress_data) if poke is None: return False else: clock_filter_accept_stop_here(poke, progress_data) # print("- accepting event #{0}, reason: {1}".format(progress_data.id, poke.name)) for w in self._watchers: w.sync(progress_data) return True def peek(self, progress_data): for w in self._watchers: if w.accept(progress_data): return w else: return None def reset(self): for w in self._watchers: w.reset() # noinspection PyArgumentEqualDefault def make_clock_filter(level, absdiff, reldiff, nodediff=0): absdiff_ = 1e-1 if absdiff is None else absdiff reldiff_ = 1e-2 if reldiff is None else reldiff if level == ProgressClock.All: return FilterAcceptAll() elif level == ProgressClock.Solutions: return FilterAcceptAllSolutions() elif level == ProgressClock.Gap: return ClockFilter(level, obj_absdiff=absdiff_, obj_reldiff=reldiff_, bbound_absdiff=absdiff_, bbound_reldiff=reldiff_, node_delta=nodediff) elif level == ProgressClock.Objective: return ClockFilter(level, obj_absdiff=absdiff_, obj_reldiff=reldiff_, bbound_absdiff=0, bbound_reldiff=0, node_delta=nodediff) elif level == ProgressClock.BestBound: return ClockFilter(level, obj_absdiff=0, obj_reldiff=0, bbound_absdiff=absdiff_, bbound_reldiff=reldiff_) else: # pragma: no cover raise ValueError('unexpected level: {0!r}'.format(level))
[docs]class TextProgressListener(ProgressListener): """ A simple implementation of Progress Listener, which prints messages to stdout, in the manner of the CPLEX log. :param clock: an enumerated value of type `:class:ProgressClock` which defines the frequency of the listener. :param absdiff: a float value which controls the minimum absolute change for objective or best bound values :param reldiff: a float value which controls the minimum absolute change for objective or best bound values. Note: The default behavior is to listen to an improvement in either the objective or best bound, each being improved by either an absolute value change of at least `absdiff`, or a relative change of at least `reldiff`. """ def __init__(self, clock=ProgressClock.Gap, gap_fmt=None, obj_fmt=None, absdiff=None, reldiff=None): ProgressListener.__init__(self, clock, absdiff, reldiff) self._gap_fmt = gap_fmt or "{:.2%}" self._obj_fmt = obj_fmt or "{:.4f}" self._count = 0
[docs] def notify_start(self): super(TextProgressListener, self).notify_start() self._count = 0
[docs] def notify_progress(self, progress_data): self._count += 1 pdata_has_incumbent = progress_data.has_incumbent incumbent_symbol = '+' if pdata_has_incumbent else ' ' # if pdata_has_incumbent: # self._incumbent_count += 1 current_obj = progress_data.current_objective if pdata_has_incumbent: objs = self._obj_fmt.format(current_obj) else: objs = "N/A" # pragma: no cover best_bound = progress_data.best_bound nb_nodes = progress_data.current_nb_nodes remaining_nodes = progress_data.remaining_nb_nodes if pdata_has_incumbent: gap = self._gap_fmt.format(progress_data.mip_gap) else: gap = "N/A" # pragma: no cover raw_time = progress_data.time rounded_time = round(raw_time, 1) print("{0:>3}{7}: Node={4} Left={5} Best Integer={1}, Best Bound={2:.4f}, gap={3}, ItCnt={8} [{6}s]" .format(self._count, objs, best_bound, gap, nb_nodes, remaining_nodes, rounded_time, incumbent_symbol, progress_data.current_nb_iterations))
[docs]class ProgressDataRecorder(ProgressListener): """ A specialized class of ProgressListener, which collects all ProgressData it receives. """ def __init__(self, clock=ProgressClock.Gap, absdiff=None, reldiff=None): super(ProgressDataRecorder, self).__init__(clock, absdiff, reldiff) self._recorded = []
[docs] def notify_start(self): super(ProgressDataRecorder, self).notify_start() # clear recorded data self._recorded = []
[docs] def notify_progress(self, progress_data): self._recorded.append(progress_data)
@property def number_of_records(self): return len(self._recorded) @property def iter_recorded(self): """ Returns an iterator on stored progress data :return: an iterator. """ return iter(self._recorded) @property def recorded(self): """ Returns a copy of the recorded data. :return: """ return self._recorded[:]
[docs]class SolutionListener(ProgressListener): """ The base class for listeners that work on intermediate solutions. To define a custom behavior for a subclass of this class, you need to redefine `notify_solution`. The current progress data is available from the `current_progress_data` property. """ # noinspection PyMethodMayBeStatic def check_solution_clock(self): if not self.clock.listens_to_solution: docplex_fatal('Solution listener requires a solution clock among (Solutions,Objective|Gap), {0} was passed', self.clock) def __init__(self, clock=ProgressClock.Solutions, absdiff=None, reldiff=None): super(SolutionListener, self).__init__(clock, absdiff, reldiff) self.check_solution_clock() def requires_solution(self): return True
[docs] def notify_solution(self, sol): """ Generic method to be redefined by custom listeners to handle intermediate solutions. This method is called by the CPLEX search with an intermediate solution. :param sol: an instance of :class:`docplex.mp.solution.SolveSolution` Note: as this method is called at each node of the MIP search, it may happen that several calls are made with an identical solution, that is, different object instances, but sharing the same variable values. """ pass # pragma: no cover
[docs] def notify_start(self): super(SolutionListener, self).notify_start()
def accept(self, pdata): return pdata.has_incumbent and super(SolutionListener, self).accept(pdata)
[docs]class SolutionRecorder(SolutionListener): """ A specialized implementation of :class:`SolutionListener`, which stores --all-- intermediate solutions. As the listener might be called at different times with identical incumbent values, thus the list of solutions list might well contain identical solutions. """ def __init__(self, clock=ProgressClock.Gap, absdiff=None, reldiff=None): super(SolutionRecorder, self).__init__(clock, absdiff, reldiff) self._solutions = []
[docs] def notify_start(self): """ Redefinition of the generic notify_start() method to clear all data modified by solve(). In this case, clears the list of solutions. """ super(SolutionListener, self).notify_start() self._solutions = []
[docs] def notify_solution(self, sol): """ Redefintion of the generic `notify_solution` method, called by CPLEX. For this class, appends the intermediate solution to the list of stored solutions. """ self._solutions.append(sol)
[docs] def iter_solutions(self): """ Returns an iterator on the stored solutions""" return iter(self._solutions)
@property def number_of_solutions(self): """ Returns the number of stored solutions. """ return len(self._solutions) @property def current_solution(self): # redefinition of generic method `notify_solution`, called by CPLEX. sols = self._solutions return sols[-1] if sols else None
[docs]class FunctionalSolutionListener(SolutionListener): """ A subclass of SolutionListener, which calls a function at each intermediate solution. No exception is caught. """ def __init__(self, solution_fn, clock=ProgressClock.Gap, absdiff=None, reldiff=None): SolutionListener.__init__(self, clock, absdiff, reldiff) self._sol_fn = solution_fn
[docs] def notify_solution(self, sol): self._sol_fn(sol)
[docs]class KpiListener(SolutionListener): """ A refinement of SolutionListener, which computes KPIs at each intermediate solution. Calls the `publish` method with a dicitonary of KPIs. Defaul tis to do nothing. This listener listens to the `Gap` clock. """ objective_kpi_name = '_current_objective' time_kpi_name = '_current_time' def __init__(self, model, clock=ProgressClock.Gap, absdiff=None, reldiff=None): super(KpiListener, self).__init__(clock, absdiff, reldiff) self.model = model
[docs] def publish(self, kpi_dict): """ This method is called at each improving solution, with a dictionay of name, values. :param kpi_dict: a dicitonary of names and KPi values, computed on the intermediate solution. """ pass
[docs] def notify_solution(self, sol): pdata = self.current_progress_data # 1. build a dict from formatted names to kpi values. kpis_as_dict = {kp.name: kp.compute(sol) for kp in self.model.iter_kpis()} # 2. add predefined keys for obj, time. kpis_as_dict[self.objective_kpi_name] = sol.objective_value kpis_as_dict[self.time_kpi_name] = pdata.time self.publish(kpis_as_dict)
[docs]class KpiPrinter(KpiListener): def __init__(self, model, clock=ProgressClock.Gap, absdiff=None, reldiff=None, kpi_format='* ItCnt={3:d} KPI: {1:<{0}} = '): super(KpiPrinter, self).__init__(model, clock, absdiff, reldiff) self.kpi_format = kpi_format
[docs] def publish(self, kpi_dict): try: max_kpi_name_len = max(len(kn) for kn in kpi_dict) # max() raises ValueError on empty except ValueError: max_kpi_name_len = 0 kpi_num_format = self.kpi_format + '{2:.3f}' kpi_str_format = self.kpi_format + '{2!s}' print('-' * (max_kpi_name_len + 15)) itcnt = self.current_progress_data.current_nb_iterations for kn, kv in kpi_dict.items(): if is_number(kv): k_format = kpi_num_format else: k_format = kpi_str_format kps = k_format.format(max_kpi_name_len, kn, kv, itcnt) print(kps)