Source code for docplex.mp.relaxer

# --------------------------------------------------------------------------
# Source file provided under Apache License, Version 2.0, January 2004,
# http://www.apache.org/licenses/
# (c) Copyright IBM Corp. 2015, 2022
# --------------------------------------------------------------------------

from collections import defaultdict, namedtuple
from docplex.mp.constants import RelaxationMode

from docplex.mp.utils import is_function, apply_thread_limitations, _to_list
from docplex.mp.priority import Priority
from docplex.mp.constr import AbstractConstraint
from docplex.mp.error_handler import docplex_fatal
from docplex.mp.sdetails import SolveDetails

from docplex.util import as_df
from docplex.mp.publish import PublishResultAsDf

try:
    import pandas as pd
except ImportError:  # pragma: no cover
    pd = None

TOutputTables = namedtuple('TOutputTables', ['Constraint', 'Priority', 'Amount'])


def to_output_table(relaxer, use_pd=True):
    if pd and use_pd:
        return as_df(relaxer)
    else:
        result = []
        prioritizer = relaxer.prioritizer
        for ct, relaxed in relaxer.iter_relaxations():
            result.append(TOutputTables(ct.name,
                                        prioritizer.get_priority_internal(ct).name,
                                        relaxed))
        return result


# noinspection PyAbstractClass
[docs]class Prioritizer(object): ''' Abstract base class for prioritizers. This class acts like a functor to be called on each model constraint. ''' def __init__(self, override=False): self._override = override def get_priority(self, ct): raise NotImplementedError("base class") # pragma: no cover def get_priority_internal(self, ct): ct_prio = ct.priority if not self._override and ct_prio: return ct_prio else: return self.get_priority(ct)
class IRelaxationListener(object): # INTERNAL # ''' Base class for relaxation listeners.''' def notify_start_relaxation(self, priority, relaxables): ''' This method is called at each step of the relaxation loop.''' pass # pragma: no cover def notify_failed_relaxation(self, priority, relaxables): ''' This method is called when a relaxation attempt fails.''' pass # pragma: no cover def notify_successful_relaxation(self, priority, relaxables, relaxed_obj_value, violations): ''' This method is called when a relaxation succeeds.''' pass # pragma: no cover class VerboseRelaxationListener(IRelaxationListener): # INTERNAL # ''' A default implementation of the listener, which prints messages.''' def __init__(self): IRelaxationListener.__init__(self) self.relaxation_count = 0 def notify_start_relaxation(self, priority, relaxables): self.relaxation_count += 1 print("-> relaxation #{0} starts with priority: {1!s}, #relaxables={2:d}" .format(self.relaxation_count, priority.name, len(relaxables))) def notify_failed_relaxation(self, priority, relaxables): print("<- relaxation #{0} fails, priority: {1!s}, #relaxables={2:d}" .format(self.relaxation_count, priority.name, len(relaxables))) def notify_successful_relaxation(self, priority, relaxables, obj, violations): print("<- relaxation #{0} succeeds: priority: {1!s}, #relaxables={2:d}, obj={3}, #relaxations={4}". format(self.relaxation_count, priority.name, len(relaxables), obj, len(violations)))
[docs]class NamedPrioritizer(Prioritizer): # INTERNAL # """ Basic prioritizer that relaxes any constraint with a name. # # More precisely, the prioritizer logic works as follows: # # - If the constraint has a ``priority`` attribute, then it is assumed # to hold a priority instance and use it. # - If the constraint has a user-defined name, relax it with MEDIUM priority. # - Otherwise, the constraint is not to be relaxed (that is, it is assigned MANDATORY priority). # # """ def __init__(self, priority=Priority.MEDIUM, override=False): Prioritizer.__init__(self, override) self._priority = priority def get_priority(self, ct): return self._priority if ct.has_user_name() else Priority.MANDATORY
[docs]class UniformPrioritizer(Prioritizer): # INTERNAL # """ Constraint prioritizer that relaxes all constraints. # # This prioritizer assigns MEDIUM priority to all constraints # unless an explicit priority has been set by the user. # # """ def __init__(self, priority=Priority.MEDIUM, override=False): Prioritizer.__init__(self, override) self._priority = priority def get_priority(self, ct): return self._priority
[docs]class MatchNamePrioritizer(Prioritizer): # INTERNAL # """ Constraint prioritizer based on constraint names. # # This prioritizer analyzes constraint names for strings that match priority names. # If a constraint contains a string which matches a priority name, # then it is assigned this priority. # # If a constraint has a priority explicitly set by the ``priority`` attribute, # this user priority is returned. # # Note: # 1. Unnamed constraints are considered as non-matches. # 2. String matching is not case sensitive. # # For example: a constraint named "ct_salary_low" will be considered as having the priority LOW. # """ def __init__(self, priority_for_unnamed=Priority.MANDATORY, priority_for_non_matches=Priority.MANDATORY, case_sensitive=False, override=False): Prioritizer.__init__(self, override) assert isinstance(priority_for_unnamed, Priority) assert isinstance(priority_for_non_matches, Priority) self.priority_for_unnamed_cts = priority_for_unnamed self.priority_for_non_matching_cts = priority_for_non_matches self.priority_by_symbol = {prio.name.lower(): prio for prio in Priority} self._is_case_sensitive = bool(case_sensitive)
[docs] def get_priority(self, ct): ''' Looks for known priority names inside constraint names. ''' if not ct.has_user_name(): return self.priority_for_unnamed_cts else: ctname = ct.name ctname_to_match = ctname if self._is_case_sensitive else ctname.lower() best_matched = 0 best_matching_priority = self.priority_for_non_matching_cts for (prio_symbol, prio) in self.priority_by_symbol.items(): if ctname_to_match.find(prio_symbol) >= 0: matched = len(prio_symbol) # longer matches are preferred # e.g. very_low and low both match in very_low_ctxxx # but the prioritizer will return very_low as the match is longer. if matched > best_matched: best_matched = matched best_matching_priority = prio return best_matching_priority
[docs]class MappingPrioritizer(Prioritizer): # INTERNAL # """ # Constraint prioritizer based on a dictionary of constraints and priorities. # # Initialized from a dictionary and an optional default priority. # # Args: # priority_mapping: A dictionary with constraints as keys and priorities as values. # # default_priority: An optional priority, used when a constraint is not explicitly mentioned # in the mapping. The default value is MANDATORY, meaning that any constraint not mentioned # in the mapping will not be relaxed. # """ def __init__(self, priority_mapping, default_priority=Priority.MANDATORY, override=False): Prioritizer.__init__(self, override) # --- typecheck that this dict is a a {ct: prio} mapping. if not isinstance(priority_mapping, dict): raise TypeError for k, v in priority_mapping.items(): if not isinstance(k, AbstractConstraint): raise TypeError if not hasattr(v, 'cplex_preference'): raise TypeError # --- self._mapping = priority_mapping self._default_priority = default_priority def get_priority(self, ct): # return the dict's value for ct if nay, else its own priority or the default. return self._mapping.get(ct, ct.priority or self._default_priority)
[docs]class FunctionalPrioritizer(Prioritizer): def __init__(self, fn, override=False): Prioritizer.__init__(self, override) self._prioritize_fn = fn def get_priority(self, ct): return self._prioritize_fn(ct)
# internal named tuples _TRelaxableGroup = namedtuple("_TRelaxableGroup", ["preference", "relaxables"]) _TParamData = namedtuple('_TParamInfo', ['short_name', 'default_value', 'accessor'])
[docs]class Relaxer(PublishResultAsDf, object): ''' This class is an abstract algorithm, in the sense that it operates on interfaces. It takes a prioritizer, which an implementation of ``ConstraintPrioritizer``. For convenience, predefined prioritizer types are accessible through names: - `all` relaxes all constraints using a MEDIUM priority; this is the default. - `named` relaxes all constraints with a user name but not the others. - `match` looks for priority names within constraint names; unnamed constraints are not relaxed. Note: All predefined prioritizers apply various forms of logic, but, when a constraint has been assigned a priority by the user, this priority is always used. For example, the `named` prioritizer relaxes all named constraints with MEDIUM, but if an unnamed constraint was assigned a HIGH priority, then HIGH will be used. See Also: :class:`docplex.mp.priority.Priority` ''' default_precision = 1e-5 _default_mode = RelaxationMode.OptSum def __init__(self, prioritizer='all', verbose=False, precision=default_precision, override=False, **kwargs): self.output_table_customizer = kwargs.get('output_processing') # docplex_wml tables, internal self.output_table_property_name = 'relaxations_output' self.default_output_table_name = 'relaxations.csv' self.output_table_using_df = True # if pandas is available of course self._precision = precision # --- if hasattr(prioritizer, 'get_priority'): self._prioritizer = prioritizer elif prioritizer == 'match': self._prioritizer = MatchNamePrioritizer(override=override) elif isinstance(prioritizer, dict): self._prioritizer = MappingPrioritizer(priority_mapping=prioritizer, override=override) elif prioritizer == 'named': self._prioritizer = NamedPrioritizer() elif prioritizer is None or prioritizer == 'all': self._prioritizer = UniformPrioritizer(override=override) elif is_function(prioritizer): self._prioritizer = FunctionalPrioritizer(prioritizer, override=override) else: print("Cannot deduce a prioritizer from: {0!r} - expecting \"name\"|\"default\"| dict", prioritizer) raise TypeError self._cumulative = True self._listeners = [] # result data self._last_relaxation_status = False self._last_relaxation_objective = -1e+75 self._last_successful_relaxed_priority = Priority.MANDATORY self._last_relaxation_details = SolveDetails.make_dummy() self._relaxations = {} self._verbose = verbose self._verbose_listener = VerboseRelaxationListener() if self._verbose: self.add_listener(self._verbose_listener) @property def prioritizer(self): return self._prioritizer def set_verbose(self, is_verbose): if is_verbose != self._verbose: self._verbose = is_verbose self.set_verbose_listener_from_flag(is_verbose) def set_verbose_listener_from_flag(self, is_verbose): if is_verbose: self.add_listener(self._verbose_listener) else: self.remove_listener(self._verbose_listener) def get_verbose(self): return self._verbose verbose = property(get_verbose, set_verbose) def _check_successful_relaxation(self): if not self._last_relaxation_status: docplex_fatal("No relaxed solution is present") def _reset(self): # INTERNAL self._last_relaxation_status = False self._last_relaxation_objective = -1e+75 self._last_successful_relaxed_priority = Priority.MANDATORY self._relaxations = {} def _accept_violation(self, violation): ''' The filter method which accepts or rejects a violation.''' return 0 == self._precision or abs(violation) >= self._precision def add_listener(self, listener): # INTERNAL # """ Adds a relaxation listener. # # Args: # listener: The new listener to add. If ``listener`` is not an # instance of ``IRelaxationListener``, it is ignored. # # See Also: # :class:`IRelaxationListener` # """ if isinstance(listener, IRelaxationListener): self._listeners.append(listener) def remove_listener(self, listener): # INTERNAL # """ Removes a relaxation listener. # # Args: # listener: The listener to remove. # """ if listener in self._listeners: self._listeners.remove(listener) def clear_listeners(self): # INTERNAL # """ Removes all relaxation listeners. # """ self._listeners = [] _param_data = {}
[docs] def relax(self, mdl, relax_mode=None, **kwargs): """ Runs the relaxation loop. Args: mdl: The model to be relaxed. relax_mode: the relaxation mode. Accept either None (in which case the default mode is used, or an instance of ``RelaxationMode`` enumerated type, or a string that can be translated to a relaxation mode. kwargs: Accepts named arguments similar to ``solve``. Returns: If the relaxation succeeds, the method returns a solution object, an instance of ``SolveSolution``; otherwise returns None. See Also: :func:`docplex.mp.model.Model.solve`, :class:`docplex.mp.solution.SolveSolution`, :class:`docplex.mp.constants.RelaxationMode` """ self._reset() # 1. build a dir {priority : cts} priority_map = defaultdict(list) nb_prioritized_cts = 0 mdl_priorities = set() mandatory_justifier = None nb_mandatories = 0 for ct in mdl.iter_constraints(): prio = self._prioritizer.get_priority_internal(ct) if prio.is_mandatory(): nb_mandatories += 1 if mandatory_justifier is None: mandatory_justifier = ct else: priority_map[prio].append(ct) nb_prioritized_cts += 1 mdl_priorities.add(prio) sorted_priorities = sorted(list(mdl_priorities), key=lambda p: p.value) if 0 == nb_prioritized_cts: mdl.error("Relaxation algorithm found no relaxable constraints - exiting") return None if nb_mandatories: assert mandatory_justifier is not None s_justifier = mandatory_justifier.to_readable_string() mdl.warning('{0} constraint(s) will not be relaxed (e.g.: {1})', nb_mandatories, s_justifier) temp_relax_verbose = kwargs.pop('verbose', False) if temp_relax_verbose != self._verbose: # install/deinstall listener for this relaxation only self.set_verbose_listener_from_flag(temp_relax_verbose) # relaxation loop all_groups = [] all_relaxable_cts = [] is_cumulative = self._cumulative # -- relaxation mode if relax_mode is None: used_relax_mode = self._default_mode else: used_relax_mode = RelaxationMode.parse(relax_mode) if not mdl.is_optimized(): used_relax_mode = RelaxationMode.get_no_optimization_mode(used_relax_mode) # save this for restore later saved_context_log_output = mdl.context.solver.log_output saved_log_output_stream = mdl.log_output saved_context = mdl.context # take into account local argument overrides relax_context = mdl.prepare_actual_context(**kwargs) transient_engine = False relax_engine = mdl.get_engine() if temp_relax_verbose: print("-- starting relaxation. mode: {0!s}, precision={1}".format(used_relax_mode.name, self._precision)) try: # mdl.context has been saved in saved_context above mdl.context = relax_context mdl.set_log_output(mdl.context.solver.log_output) # engine parameters, if needed to parameters = apply_thread_limitations(relax_context) mdl._apply_parameters_to_engine(parameters) relaxed_sol = None for prio in sorted_priorities: if prio in priority_map: cts = priority_map[prio] if not cts: # this should not happen... continue # pragma: no cover pref = prio.cplex_preference # build a new group relax_group = _TRelaxableGroup(pref, cts) # relaxing new batch of cts: if not is_cumulative: # pragma: no cover # if not cumulative reset the groupset all_groups = [relax_group] all_relaxable_cts = cts else: all_groups.append(relax_group) all_relaxable_cts += cts # at this stage we have a sequence of groups # a group is itself a sequence of two components # - a preference factor # - a sequence of constraints for ls in self._listeners: ls.notify_start_relaxation(prio, all_relaxable_cts) # ---- # call the engine. # --- try: relaxed_sol = relax_engine.solve_relaxed(mdl, prio.name, all_groups, used_relax_mode) finally: self._last_relaxation_details = relax_engine.get_solve_details() # --- if relaxed_sol is not None: relax_obj = relaxed_sol.objective_value self._last_successful_relaxed_priority = prio self._last_relaxation_status = True self._last_relaxation_objective = relaxed_sol.objective_value # filter irrelevant relaxations below some threshold for ct in all_relaxable_cts: raw_infeas = relaxed_sol.get_infeasibility(ct) if self._accept_violation(raw_infeas): self._relaxations[ct] = raw_infeas if not self._relaxations: mdl.warning( "Relaxation of model `{0}` found one relaxed solution, but no relaxed constraints - check".format( mdl.name)) for ls in self._listeners: ls.notify_successful_relaxation(prio, all_relaxable_cts, relax_obj, self._relaxations) # now get out break else: # TODO: maybe issue a warning that relaxation has failed? # relaxation has failed, notify the listeners for ls in self._listeners: ls.notify_failed_relaxation(prio, all_relaxable_cts) mdl.notify_solve_relaxed(relaxed_sol, relax_engine.get_solve_details()) # write relaxation table.write_output_table() handles everything related to # whether the table should be published etc... if self.is_publishing_output_table(mdl.context): output_table = to_output_table(self, self.output_table_using_df) self.write_output_table(output_table, mdl.context) finally: # --- restore context, log_output if set. if saved_log_output_stream != mdl.log_output: mdl.set_log_output_as_stream(saved_log_output_stream) if saved_context_log_output != mdl.context.solver.log_output: mdl.context.solver.log_output = saved_context_log_output mdl.context = saved_context if transient_engine: # pragma: no cover del relax_engine if temp_relax_verbose != self._verbose: # realign listener with flag self.set_verbose_listener_from_flag(self._verbose) return relaxed_sol
[docs] def iter_relaxations(self): """ Iterates on relaxations. Relaxations are built as a dictionary with constraints as keys and numeric violations as values, so this iterator returns ``(ct, violation)`` pairs. """ self._check_successful_relaxation() return iter(self._relaxations.items())
[docs] def relaxations(self): """ Returns a dictionary with all relaxed constraints. Returns: A dictionary where the keys are the relaxed constraints, and the values are the numerical slacks. """ return self._relaxations.copy()
def __as_df__(self): ''' Returns a pandas.DataFrame with all relaxed constraint. Returns: A pandas.DataFrame which columns are: - Constraint: the constraint name - Priority: The priority of the constraint - Amount: The amount of relaxation ''' if not pd: # pragma: no cover raise NotImplementedError('Cannot convert results to DataFrame, pandas is not available') columns = ['Constraint', 'Priority', 'Amount'] results_list = [] prioritizer = self.prioritizer for ct, relaxed in self.iter_relaxations(): results_list.append({'Constraint': ct.name, 'Priority': prioritizer.get_priority_internal(ct).name, 'Amount': relaxed}) df = pd.DataFrame(results_list, columns=columns) return df def get_total_relaxation(self): self._check_successful_relaxation() return sum(abs(v) for v in self._relaxations.values()) @property def total_relaxation(self): return self.get_total_relaxation() def print_information(self): self._check_successful_relaxation() print("* number of relaxations: {0}".format(len(self._relaxations))) for rct, relaxation in self.iter_relaxations(): arg = rct.name if rct.has_user_name() else str(rct) print(" - relaxed: {0}, with relaxation: {1}".format(arg, relaxation)) print("* total absolute relaxation: {0}".format(self.get_total_relaxation())) def as_dict(self): rxd = {rct.name or str(rct): relaxed for rct, relaxed in self.iter_relaxations()} return rxd @property def relaxed_objective_value(self): """ Returns the objective value of the relaxed solution. Raises: DOCplexException If the relaxation has not been successful. """ self._check_successful_relaxation() return self._last_relaxation_objective @property def number_of_relaxations(self): """ This property returns the number of relaxations found. """ return len(self._relaxations)
[docs] def get_relaxation(self, ct): """ Returns the infeasibility computed for this constraint. Args: ct: A constraint. Returns: The amount by which the constraint has been relaxed by the relaxer. The method returns 0 if the constraint has not been relaxed. """ self._check_successful_relaxation() return self._relaxations.get(ct, 0)
[docs] def is_relaxed(self, ct): ''' Returns true if the constraint ``ct`` has been relaxed Args: ct: The constraint to check. Returns: True if the constraint has been relaxed, else False. ''' self._check_successful_relaxation() return ct in self._relaxations
@classmethod def run_feasopt(cls, model, relaxables, relax_mode): relaxable_list = _to_list(relaxables) groups = [] checker = model._checker try: for pref, ctseq in relaxable_list: checker.typecheck_num(pref) cts = _to_list(ctseq) for ct in cts: checker.typecheck_constraint(ct) groups.append((pref, cts)) except ValueError: model.fatal("expecting container with (preference, constraints), got: {0!s}", relaxable_list) feasible = model.get_engine().solve_relaxed(mdl=model, relaxable_groups=groups, prio_name='feasopt', relax_mode=relax_mode) return feasible