# --------------------------------------------------------------------------
# Source file provided under Apache License, Version 2.0, January 2004,
# (c) Copyright IBM Corp. 2015, 2016
# --------------------------------------------------------------------------
import warnings
from collections import namedtuple, defaultdict

from import is_string
from import ComparisonType, VarBoundType
from import str_maxed
from import PublishResultAsDf

TConflictConstraint = namedtuple("_TConflictConstraint", ["name", "element", "status"])

def trim_field(element):
    return str_maxed(element, maxlen=50)

def to_output_table(conflicts, use_df=True):
    # Returns the output tables, as df if pandas is available or as a list
    # of named tuple ['Type', 'Status', 'Name', 'Expression']
    columns = ['Type', 'Status', 'Name', 'Expression']
    TOutputTables = namedtuple('TOutputTables', columns)

    def convert_to_pandas_df(c):
        return {'Type': 'Constraint',
                'Status': if c.status is not None else '',
                'Name': or '',
                'Expression': trim_field(c.element)}

    def convert_to_namedtuples(c):
        return TOutputTables('Constraint',
                    or '',

    pandas = None
    if use_df:
            import pandas
        except ImportError:  # pragma: no cover
            print("pandas module not found...")
            pandas = None

    data_converter = convert_to_pandas_df if pandas and use_df else convert_to_namedtuples
    output_data = list(map(data_converter, conflicts))

    if use_df:
        return pandas.DataFrame(columns=columns, data=output_data)
        return output_data

[docs]class ConflictRefinerResult(object): """ This class contains all conflicts as returned by the conflict refiner. A conflict refiner result contains a list of named tuples of type ``TConflictConstraint``, the fields of which are: - an enumerated value of type ```` that indicates the conflict status type (Excluded, Possible_member, Member...). - the name of the constraint or None if the constraint corresponds to a variable lower or upper bound. - a modeling object involved in the conflict: can be either a constraint or a wrapper representing a variable upper or lower bound. *New in version 2.11* """ def __init__(self, conflicts, refined_by=None): self._conflicts = conflicts assert refined_by is None or is_string(refined_by) self._refined_by = refined_by @property def refined_by(self): ''' Returns a string indicating how the conflicts were produced. - If the conflicts are created by a program, this field returns None. - If the conflicts originated from a local CPLEX run, this method returns 'cplex_local'. - If the conflicts originated from a DOcplexcloud run, this method returns 'cplex_cloud'. Returns: A string, or None. ''' return self._refined_by def __iter__(self): return self.iter_conflicts() def __len__(self): """ Redefintion of maguic method __len__. Allows calling len() on an instance of ConflictRefinerResult to get the number of conflicts :return: the number of conflicts. """ return len(self._conflicts)
[docs] def iter_conflicts(self): """ Returns an iterator on conflicts (named tuples) :return: an iterator """ return iter(self._conflicts)
@property def number_of_conflicts(self): """ This property returns the number of conflicts. """ return len(self._conflicts)
[docs] def display(self): """ Displays all conflicts. """ print('conflict(s): {0}'.format(self.number_of_conflicts)) for conflict in self.iter_conflicts(): st = conflict.status elt = conflict.element if hasattr(conflict.element, 'as_constraint'): ct = conflict.element.as_constraint() label = elt.short_typename else: ct = elt label = ct.__class__.__name__ print(" - status: {1}, {0}: {2!s}".format(label,, ct.to_readable_string()))
[docs] def display_stats(self): """ Displays statistics on conflicts. Display show many conflict elements per type. """ def elt_typename(elt): try: return elt.short_typename.lower() except AttributeError: # pragma: no cover return elt.__class__.__name__.lower() ncf = self.number_of_conflicts print('conflict{1}: {0}'.format(ncf, "s" if ncf > 1 else "")) cf_stats = defaultdict(lambda: 0) for conflict in self.iter_conflicts(): elt_type = elt_typename(conflict.element) cf_stats[elt_type] += 1 for eltt, count in cf_stats.items(): if count: print(" - {0}{2}: {1}".format(eltt, count, ("s" if count > 1 else "")))
def as_output_table(self, use_df=True): return to_output_table(self, use_df)
[docs] def print_information(self): """ Similar as `display_stats` """ self.display_stats()
class VarBoundWrapper(object): # INTERNAL def __init__(self, dvar): self._var = dvar @property def var(self): return self._var @property def index(self): return self._var.index @property def short_typename(self): # pragma: no cover return "Variable Bound" def as_constraint(self): # pragma: no cover raise NotImplementedError def as_constraint_from_symbol(self, op_symbol): self_var = self.var var_lb = op = ComparisonType.cplex_ctsense_to_python_op(op_symbol) ct = op(self_var, var_lb) return ct @classmethod def make_wrapper(cls, var, bound_type): if bound_type == VarBoundType.LB: return VarLbConstraintWrapper(var) elif bound_type == VarBoundType.UB: return VarUbConstraintWrapper(var) else: return None
[docs]class VarLbConstraintWrapper(VarBoundWrapper): """ This class is a wrapper for a model variable and its associated lower bound. Instances of this class are created by the ``refine_conflict`` method when the conflict involves a variable lower bound. Each of these instances is then referenced by a ``TConflictConstraint`` namedtuple in the conflict list returned by ``refine_conflict``. To check whether the lower bound of a variable causes a conflict, wrap the variable and include the resulting constraint in a ConstraintsGroup. """ @property def short_typename(self): return "Lower Bound" def as_constraint(self): return self.as_constraint_from_symbol('G')
[docs]class VarUbConstraintWrapper(VarBoundWrapper): """ This class is a wrapper for a model variable and its associated upper bound. Instances of this class are created by the ``refine_conflict`` method when the conflict involves a variable upper bound. Each of these instances is then referenced by a ``TConflictConstraint`` namedtuple in the conflict list returned by ``refine_conflict``. To check whether the upper bound of a variable causes a conflict, wrap the variable and include the resulting constraint in a ConstraintsGroup. """ @property def short_typename(self): return "Upper Bound" def as_constraint(self): return self.as_constraint_from_symbol('L')
[docs]class ConstraintsGroup(object): """ This class is a container for the definition of a group of constraints. A preference for conflict refinement is associated to the group. Groups may be assigned preference. A group with a higher preference is more likely to be included in the conflict. A negative value specifies that the corresponding group should not be considered in the computation of a conflict. In other words, such groups are not considered part of the model. Groups with a preference of 0 (zero) are always considered to be part of the conflict. Args: preference: A floating-point number that specifies the preference for the group. The higher the number, the higher the preference. """ __slots__ = ('_preference', '_cts') def __init__(self, preference=1.0, cts=None): self._preference = preference self._cts = [] if cts is not None: self.add_constraints(cts)
[docs] @classmethod def from_var(cls, dvar, bound_type, pref): """ A class method to build a group fromone variable. :param dvar: The variable whose bound is part of the conflict. :param bound_type: An enumerated value of type `VarBoundType` :param pref: a numerical preference. :return: an instance of ConstraintsGroup. See Also: :class:`` """ cgg = cls(preference=pref, cts=VarBoundWrapper.make_wrapper(dvar, bound_type)) return cgg
@property def preference(self): return self._preference def add_one(self, x): if x is not None: self._cts.append(x) def add_constraint(self, ct): self._cts.append(ct) def add_constraints(self, cts): try: for ct in cts: self.add_one(ct) except TypeError: # not iterable. self.add_one(cts) def iter_constraints(self): return iter(self._cts)
[docs]class ConflictRefiner(PublishResultAsDf, object): ''' This class is an abstract algorithm; it operates on interfaces. A conflict is a set of mutually contradictory constraints and bounds within a model. Given an infeasible model, the conflict refiner can identify conflicting constraints and bounds within it. CPLEX refines an infeasible model by examining elements that can be removed from the conflict to arrive at a minimal conflict. ''' # static variables for output output_table_property_name = 'conflicts_output' default_output_table_name = 'conflicts.csv' output_table_using_df = True def __init__(self, output_processing=None): self.output_table_customizer = output_processing @classmethod def _make_atomic_ct_groups(cls, mdl_iter, pref): # returns a list of singleton groups from a model iterator and a numerical preference. lcgrps = [ConstraintsGroup(pref, ct) for ct in mdl_iter] return lcgrps
[docs] @classmethod def var_bounds(cls, mdl, pref=4.0, include_infinity_bounds=True): """ Returns a list of singleton groups with variable bounds. This method a list of ConstraintGroup objects, each of which contains a variabel bound. It replicate sthe behavior of the CPLEX interactive optimizer, that is, it returns - lower bounds for non-binary variables if different from 0 - upper bound for non-binary-variables if non-default For binary variables, bounds are not considered, unless the variable is bound; more precisely: - lower bound is included if >= 0.5 - upper bound is included if <= 0.5 :param mdl: The model being analyzed for conflicts, :param pref: the preference for variable bounds, the defaut is 4.0 :param include_infinity_bounds: a flag indicating whether infi :return: a list of `ConstraintsGroup` objects. """ grps = [] mdl_inf = mdl.infinity for dv in mdl.iter_variables(): lb, ub =, dv.ub if not dv.is_binary(): if lb != 0: if include_infinity_bounds or lb > - mdl_inf: grps.append(ConstraintsGroup.from_var(dv, VarBoundType.LB, pref)) if include_infinity_bounds or ub < mdl_inf: grps.append(ConstraintsGroup.from_var(dv, VarBoundType.UB, pref)) else: if lb >= 0.5: grps.append(ConstraintsGroup.from_var(dv, VarBoundType.LB, pref)) if ub <= 0.5: grps.append(ConstraintsGroup.from_var(dv, VarBoundType.UB, pref)) return grps
@classmethod def linear_constraints(cls, mdl, pref=2.0): return cls._make_atomic_ct_groups(mdl.iter_linear_constraints(), pref) @classmethod def logical_constraints(cls, mdl, pref=1.0): return cls._make_atomic_ct_groups(mdl.iter_logical_constraints(), pref) @classmethod def quadratic_constraints(cls, mdl, pref=1.0): return cls._make_atomic_ct_groups(mdl.iter_quadratic_constraints(), pref)
[docs] def refine_conflict(self, mdl, preferences=None, groups=None, display=False, **kwargs): """ Starts the conflict refiner on the model. Args: mdl: The model to be relaxed. preferences: A dictionary defining constraint preferences. groups: A list of ConstraintsGroups. display: a boolean flag (default is True); if True, displays the result at the end. kwargs: Accepts named arguments similar to solve. Returns: An object of type `ConflictRefinerResut` which holds all information about the minimal conflict. See Also: :class:`ConflictRefinerResult` """ if mdl.has_multi_objective(): mdl.fatal("Conflict refiner is not supported for multi-objective") # take into account local argument overrides context = mdl.prepare_actual_context(**kwargs) # log stuff saved_context_log_output = mdl.context.solver.log_output saved_log_output_stream = mdl.log_output try: mdl.set_log_output(context.solver.log_output) if mdl.environment.has_cplex: results = self._refine_conflict_local(mdl, context, preferences, groups) else: return mdl.fatal("CPLEX runtime not found: cannot run conflict refiner") # write conflicts table.write_output_table() handles everything related to # whether the table should be published etc... if self.is_publishing_output_table(mdl.context): self.write_output_table(results.as_output_table(self.output_table_using_df), mdl.context) if display: results.display_stats() return results finally: if saved_log_output_stream != mdl.log_output: mdl.set_log_output_as_stream(saved_log_output_stream) if saved_context_log_output != mdl.context.solver.log_output: mdl.context.solver.log_output = saved_context_log_output
# noinspection PyMethodMayBeStatic def _refine_conflict_local(self, mdl, context, preferences=None, groups=None): parameters = context.cplex_parameters self_engine = mdl.get_engine() return self_engine.refine_conflict(mdl, preferences, groups, parameters)
[docs] @staticmethod def display_conflicts(conflicts): """ This method displays a formatted representation of the conflicts that are provided. Args: conflicts: An instance of ``ConflictRefinerResult`` """ warnings.warn("deprecated: use ConflictRefinerresult.display", DeprecationWarning) conflicts.display()