# --------------------------------------------------------------------------
# Source file provided under Apache License, Version 2.0, January 2004,
# http://www.apache.org/licenses/
# (c) Copyright IBM Corp. 2015, 2019
# --------------------------------------------------------------------------
'''Configuration of Mathematical Programming engine.
The :class:`~Context` is the base class to control the behaviour of solve
engine.
It is not advised to instanciate a Context in your code.
Instead, you should obtain a Context by the following ways:
* create one using Context.make_default_context()
* use the one in your :class:`docplex.mp.model.Model`
``docplex.mp`` configuration files are stored in files named:
* cplex_config.py
* cplex_config_<hostname>.py
When obtaining a Context with make_default_context(), the PYTHONPATH is
searched for the configuration files and read.
Configuration files are evaluated with a `context` object in their
scope, and you set values from this context::
context.cplex_parameters.emphasis.memory = 1
context.cplex_parameters.emphasis.mip = 2
'''
import os
from copy import deepcopy
import shlex
import socket
import sys
import warnings
from os.path import isfile, isabs
from docplex.util.environment import get_environment
from docplex.mp.utils import is_string, open_universal_newline
from docplex.mp.params.cplex_params import get_params_from_cplex_version
from docplex.mp.params.parameters import RootParameterGroup
from docplex.mp.utils import DOcplexException
from docplex.mp.error_handler import docplex_fatal
try:
from docplex_wml.worker.solvehook import get_solve_hook
except ImportError:
get_solve_hook = None
def init_cplex_parameters(x):
return x.init_cplex_parameters()
class StreamWithCustomClose(object):
# wrapper for streams, so that we can keep track of those who need
# to be closed by us.
def __init__(self, target):
self._target = target
def __getattr__(self, name):
return getattr(self._target, name)
def custom_close(self):
return self._target.close()
def __str__(self):
return "<{0}>".format(self._target.name or "input")
# some utility methods
# def _get_value_as_int(d, option):
# try:
# value = int(d[option])
# except Exception:
# value = None
# return value
#
#
# def _convert_to_int(value):
# if str(value).lower() == 'none':
# return None
# try:
# value = int(value)
# except Exception:
# value = None
# return value
#
#
# def _get_value_as_string(d, option):
# return d.get(option, None)
#
#
# def _get_value_as_boolean(d, option):
# try:
# value = _convert_to_bool(d[option])
# except Exception:
# value = None
# return value
_boolean_map = {'1': True, 'yes': True, 'true': True, 'on': True,
'0': False, 'no': False, 'false': False, 'off': False}
def _convert_to_bool(value):
if value is None:
return None
elif is_string(value):
svalue = str(value).lower()
if svalue == "none":
return None
else:
bvalue = _boolean_map.get(svalue)
if bvalue is not None:
return bvalue
else:
raise ValueError('Not a boolean: {0}'.format(value))
else:
raise ValueError('Not a boolean: {0}'.format(value))
[docs]class InvalidSettingsFileError(Exception):
'''The error raised when an error occured when reading a settings file.
*New in version 2.8*
'''
def __init__(self, mesg, filename=None, source=None, *args, **kwargs):
super(InvalidSettingsFileError, self).__init__(mesg)
self.filename = filename
self.source = source
def is_auto_publishing_solve_details(context):
try:
auto_publish_details = context.solver.auto_publish.solve_details
except AttributeError:
try:
auto_publish_details = context.solver.auto_publish
except AttributeError:
auto_publish_details = False
return auto_publish_details
def check_credentials(context):
# Checks if the context has syntactically valid credentials. The context
# has valid credentials when it has an `url` and a `key` fields and that
# both fields are string.
#
# If the credentials are not defined, `message` contains a message describing
# the cause.
#
# Returns:
# (has_credentials, message): has_credentials` - True if the context contains syntactical credentials.
# and `message` - contains a message if applicable.
credentials_ok = True
message = None
if not context.url or not context.key:
credentials_ok = False
elif not is_string(context.url):
message = "DOcplexcloud: URL is not a string: {0!s}".format(context.url)
credentials_ok = False
elif not is_string(context.key):
message = "API key is not a string: {0!s}".format(context.key)
credentials_ok = False
if context.key and credentials_ok:
credentials_ok = isinstance(context.key, str)
return credentials_ok, message
def has_credentials(context):
# Checks if the context has valid credentials.
#
# Returns:
# True if the context has valid credentials.
# ignore message
has_credentials_, _ = check_credentials(context)
return has_credentials_
def print_context(context):
# prints the context.
def print_r(node, prefix):
for n in sorted(node):
if not n.startswith('_'):
path = ".".join([prefix, n] if prefix else [n])
if isinstance(node.get(n), (dict, SolverContext)):
print("%s # type: %s" % (path, type(node.get(n)).__name__))
print_r(node.get(n), path)
else:
print("%s = %s # type: %s" % (path, node.get(n), type(node.get(n)).__name__))
print_r(context, "context")
[docs]class BaseContext(dict):
# Class for handling the list of parameters.
def __init__(self, **kwargs):
""" Create a new context.
Args:
List of ``key=value`` to initialize context with.
"""
super(BaseContext, self).__init__()
for k, v in kwargs.items():
self.set_attribute(k, v)
def __setattr__(self, name, value):
self.set_attribute(name, value)
def __getattr__(self, name):
return self.get_attribute(name)
def set_attribute(self, name, value):
self[name] = value
def get_attribute(self, name, default=None):
if name.startswith('__'):
raise AttributeError
res = self.get(name, default)
return res
def display(self):
# prints the context.
def print_r(node, prefix):
for n in sorted(node):
if not n.startswith('_'):
path = ".".join([prefix, n] if prefix else [n])
if isinstance(node.get(n), (dict, SolverContext)):
print("%s # type: %s" % (path, type(node.get(n)).__name__))
print_r(node.get(n), path)
else:
print("%s = %s # type: %s" % (path, node.get(n), type(node.get(n)).__name__))
print_r(self, "context")
[docs]class SolverContext(BaseContext):
# for internal use
def __init__(self, **kwargs):
super(SolverContext, self).__init__(**kwargs)
self.log_output = False
self.max_threads = get_environment().get_available_core_count()
self.auto_publish = create_default_auto_publish_context()
self.kpi_reporting = BaseContext()
from docplex.mp.progress import ProgressClock
self.kpi_reporting.filter_level = ProgressClock.Gap
def __deepcopy__(self, memo):
# We override deepcopy here just to make sure that we don't deepcopy
# file descriptors...
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.items():
# do not duplicate those (io like objects)
if k == "log_output" and hasattr(v, "write"):
value = v
else:
value = deepcopy(v, memo)
setattr(result, k, value)
return result
@property
def log_output_as_stream(self):
try:
log_output = self.log_output
except AttributeError: # pragma: no cover
return None
output_stream = None
# if log_output is an object with a lower attribute, let's use it
# as string.lower and check for some known string values
if hasattr(log_output, "lower"):
k = log_output.lower()
if k in _boolean_map:
if _convert_to_bool(k):
output_stream = sys.stdout
else:
output_stream = None
elif k in ["stdout", "sys.stdout"]:
output_stream = sys.stdout
elif k in ["stderr", "sys.stderr"]:
output_stream = sys.stderr
else:
output_stream = open(log_output, 'w')
output_stream = StreamWithCustomClose(output_stream)
# if log_output is == to True, just use stdout
if log_output is True:
output_stream = sys.stdout
# if it has a write() attribute, just return it
elif hasattr(log_output, "write"):
output_stream = log_output
return output_stream
[docs]class Context(BaseContext):
""" The context used to control the behavior of solve engine.
Attributes:
cplex_parameters: A
:class:`docplex.mp.params.parameters.RootParameterGroup` to store
CPLEX parameters.
solver.auto_publish: If ``True``, a model being solved will automatically
publish all publishable items (``solve_details``,
``result_output``, ``kpis_output``).
solver.auto_publish.solve_details: if ``True``, solve details are
published automatically.
solver.auto_publish.result_output: if not None, the filename where
solution is saved. This can be a list of filenames if multiple
solutions are to be published. If True, ``solution.json`` is used.
solver.auto_publish.kpis_output: if not None, the filename where KPIs
are saved as a table with KPI name and values. Currently only
csv files are supported. This can be a list of filenames if
multiple KPIs files are to be published.
context.solver.auto_publish.kpis_output_field_name: Name of field for KPI names in KPI output
table. Defaults to 'Name'
context.solver.auto_publish.kpis_output_field_value: Name of field for KPI values for KPI output
table. Defaults to 'Value'
solver.log_output: This attribute can have the following values:
* True: When True, logs are printed to sys.out.
* False: When False, logs are not printed.
* A file-type object: Logs are printed to that file-type object.
solver.kpi_reporting.filter_level: Specify the filtering level for kpi reporting.
If None, no filtering is done. Can take values of
docplex.mp.progress.KpiFilterLevel or a string representation of
one of the values of this enum (Unfiltered, FilterObjectiveAndBound,
FilterObjective)
"""
def __init__(self, **kwargs):
# store env used for initialization
self['_env_at_init'] = kwargs.get('_env')
# map lazy members to f(model) (actually f(self) ) returning the
# initial value
self['_lazy_members'] = \
{'cplex_parameters': init_cplex_parameters}
# initialize fields
if 'solver_context' in kwargs:
solver_context = kwargs.pop('solver_context')
else:
solver_context = SolverContext() #create_default_auto_publish_context(defaults=False) #Context.make_default_context()
super(Context, self).__init__(solver=solver_context,
cos=BaseContext(),
docplex_tests=BaseContext())
# update will also ensure compatibility with older kwargs like
# 'url' and 'api_key'
self.update(kwargs, create_missing_nodes=True)
self.model_build_hook = None
def init_cplex_parameters(self):
# we need a local import here so that docplex.mp.environment
# does not depend on context
from docplex.mp.environment import Environment
local_env = self.get('_env_at_init') or Environment.get_default_env()
cplex_version = local_env.cplex_version
local_env.check_cplex_version()
cplex_parameters = get_params_from_cplex_version(cplex_version)
return cplex_parameters
def __getattr__(self, name):
if name not in self:
lazy_members = self.get('_lazy_members')
if lazy_members and name in lazy_members:
evaluated = lazy_members[name](self)
self[name] = evaluated
return self.get_attribute(name)
def _get_raw_cplex_parameters(self):
# NON LAZY: may return None
return self.get('cplex_parameters')
[docs] @staticmethod
def make_default_context(file_list=None, logger=None, **kwargs):
"""Creates a default context.
If `file_list` is a string, then it is considered to be the name
of a config file to be read.
If `file_list` is a list, it is considered to be a list of names
of a config files to be read.
if `file_list` is None or not specified, the following files are
read if they exist:
* the PYTHONPATH is searched for the following files:
* cplex_config.py
* cplex_config_<hostname>.py
Args:
file_list: The list of config files to read.
kwargs: context parameters to override. See :func:`docplex.mp.context.Context.update`
"""
context = Context(**kwargs)
context.read_settings(file_list=file_list, logger=logger)
if 'DOCPLEX_CONTEXT' in os.environ:
values_pairs = []
for v in shlex.split(os.environ['DOCPLEX_CONTEXT']):
s = v.split('=', 1) # max 1 split
# convert values to bool if relevant
value = _boolean_map.get(s[1].strip().lower(), s[1])
values_pairs.append((s[0], value))
if logger:
logger.info('Setting context value %s to %s (from DOCPLEX_CONTEXT env)' % (s[0], value))
context.update_from_list(values_pairs, logger=logger)
return context
[docs] def copy(self):
# Makes a deep copy of the context.
#
# Returns:
# A deep copy of the context.
return deepcopy(self)
def clone(self):
# Makes a deep copy of the context.
#
# Returns:
# A deep copy of the context.
return deepcopy(self)
def override(self):
return ContextOverride(self)
def update_from_list(self, values, logger=None):
# For each pair of `(name, value)` in values, try to set the
# attribute.
for name, value in values:
try:
self._set_value(self, name, value)
except AttributeError:
if logger is not None:
logger.warning("Ignoring undefined attribute : {0}".format(name))
def _set_value(self, root, property_spec, property_value):
property_list = property_spec.split('.')
property_chain = property_list[:-1]
to_be_set = property_list[-1]
o = root
for c in property_chain:
o = getattr(o, c)
try:
target_attribute = getattr(o, to_be_set)
except AttributeError:
target_attribute = None
if target_attribute is None:
# Simply set the attribute
try:
setattr(o, to_be_set, property_value)
except DOcplexException:
pass # ignore this
else:
# try a set_converted_value if it's a Parameter
try:
target_attribute.set(property_value)
except (AttributeError, TypeError):
# no set(), just setattr
setattr(o, to_be_set, property_value)
[docs] def update(self, kwargs, create_missing_nodes=False):
""" Updates this context from child parameters specified in ``kwargs``.
The following keys are recognized:
- cplex_parameters: A set of CPLEX parameters to use instead of the parameters defined as ``context.cplex_parameters``.
- agent: Changes the ``context.solver.agent`` parameter.
Supported agents include:
- ``local``: forces the solve operation to use native CPLEX
- log_output: if ``True``, solver logs are output to stdout.
If this is a stream, solver logs are output to that stream object.
Overwrites the ``context.solver.log_output`` parameter.
Args:
kwargs: A ``dict`` containing keyword args to use to update this context.
create_missing_nodes: When a keyword arg specify a parameter that is not already member of this context,
creates the parameter if ``create_missing_nodes`` is True.
"""
for k in kwargs:
value = kwargs.get(k)
if value is not None:
self.update_key_value(k, value,
create_missing_nodes=create_missing_nodes)
cplex_parameters_key = "cplex_parameters"
def update_cplex_parameters(self, arg_params):
# INTERNAL
if isinstance(arg_params, RootParameterGroup):
self.cplex_parameters = arg_params
else:
new_params = self.cplex_parameters.copy()
# try a dictionary of parameter qualified names, parameter values
# e.g. cplex_parameters={'mip.tolerances.mipgap': 0.01, 'timelimit': 180}
try:
for pk, pv in arg_params.items():
p = new_params.find_parameter(key=pk)
if not p:
docplex_fatal('Cannot find matching parameter from: {0!r}'.format(pk))
else:
p.set(pv)
self.cplex_parameters = new_params
except (TypeError, AttributeError):
docplex_fatal('Expecting CPLEX parameters or dict, got: {0!r}'.format(arg_params))
def update_key_value(self, k, value, create_missing_nodes=False, warn=True):
if k == 'cplex_parameters':
if isinstance(value, RootParameterGroup):
self.cplex_parameters = value
else:
self.update_cplex_parameters(value)
elif k == 'log_output':
self.solver.log_output = value
elif k == 'override':
self.update_from_list(value.items())
elif k == '_env':
# do nothing this is just here to avoid creating too many envs
pass
elif k == 'agent':
self.solver.agent = value
else:
if create_missing_nodes:
self[k] = value
elif warn:
warnings.warn("Unknown quick-setting in Context: {0:s}, value: {1!s}".format(k, value),
stacklevel=2)
[docs] def read_settings(self, file_list=None, logger=None):
"""Reads settings for a list of files.
If `file_list` is a string, then it is considered to be the name
of a config file to be read.
If `file_list` is a list, it is considered to be a list of names
of config files to be read.
if `file_list` is None or not specified, the following files are
read if they exist:
* the PYTHONPATH is searched for the following files:
* cplex_config.py
* cplex_config_<hostname>.py
Args:
file_list: The list of config files to read.
Raises:
InvalidSettingsFileError: If an error occurs while reading a config
file. *(Since version 2.8)*
"""
if file_list is None:
file_list = []
targets = ['cplex_config.py',
'cplex_config_{0}.py'.format(socket.gethostname())
]
for target in targets:
if isabs(target) and isfile(target) and target not in file_list:
file_list.append(target)
else:
for d in sys.path:
f = os.path.join(d, target)
if os.path.isfile(f):
abs_name = os.path.abspath(f)
if abs_name not in file_list:
file_list.append(f)
if len(file_list) == 0:
file_list = None # let read_settings use its default behavior
if isinstance(file_list, str):
file_list = [file_list]
if file_list is not None:
for f in file_list:
if os.path.isfile(f):
if logger:
logger.info("Reading settings from %s" % f)
if f.endswith(".py"):
self.read_from_python_file(f)
def read_from_python_file(self, filename):
# Evaluates the content of a Python file containing code to set up a
# context.
#
# Args:
# filename (str): The name of the file to evaluate.
try:
if os.path.isfile(filename):
with open_universal_newline(filename, 'r') as f:
l = {'context': self,
'__file__': os.path.abspath(filename)}
exec(f.read(), globals(), l)
except Exception as exc:
raise InvalidSettingsFileError('Error occured while reading file: %s' % filename,
filename=filename, source=exc)
return self
def create_default_auto_publish_context(defaults=True):
auto_publish = BaseContext()
# in a future version, we might want to be able to set individual
# default values with a dict => that's why we compare to True and False
if defaults is True and get_solve_hook:
# Set default values when in a worker
auto_publish.solve_details = True
auto_publish.result_output = 'solution.json'
auto_publish.kpis_output = 'kpis.csv'
auto_publish.kpis_output_field_name = 'Name'
auto_publish.kpis_output_field_value = 'Value'
auto_publish.relaxations_output = 'relaxations.csv'
auto_publish.conflicts_output = 'conflicts.csv'
else:
auto_publish.solve_details = False
auto_publish.result_output = None
auto_publish.kpis_output = None
auto_publish.kpis_output_field_name = None
auto_publish.kpis_output_field_value = None
auto_publish.relaxations_output = None
auto_publish.conflicts_output = None
return auto_publish
[docs]class ContextOverride(Context):
def __init__(self, initial_context):
soc2 = deepcopy(initial_context.solver)
super(ContextOverride, self).__init__(solver_context=soc2)
self._initial_context = initial_context # unchanged
self.cplex_parameters = initial_context.cplex_parameters
# @property
# def solver(self):
# if self._solver is None:
# self._solver = deepcopy(self._initial_context.solver)
# return self._solver
def update_key_value(self, k, value, create_missing_nodes=False, warn=True):
if k == 'cplex_parameters':
if isinstance(value, RootParameterGroup):
self.cplex_parameters = value
else:
self.update_cplex_parameters(value)
elif k == 'time_limit':
time_limit_failed = True
try:
time_limit = int(value)
if time_limit >= 0:
# this method makes a local copy for the duration of solve()
self.update_cplex_parameters({"timelimit": time_limit})
time_limit_failed = False
except ValueError:
pass
if time_limit_failed:
print("Invalid time limit: {0!r} - ignored".format(value))
elif k == 'log_output':
self.solver.log_output = value
elif k == 'override':
self.update_from_list(value.items())
elif k == '_env':
# do nothing this is just here to avoid creating too many envs
pass
elif k == 'agent':
self.solver.agent = value
else:
if create_missing_nodes:
self.k = value
elif warn:
warnings.warn("Unknown quick-setting in Context: {0:s}, value: {1!s}".format(k, value),
stacklevel=2)
def update_cplex_parameters(self, arg_params):
# INTERNAL
new_params = self.cplex_parameters.copy()
# try a dictionary of parameter qualified names, parameter values
# e.g. cplex_parameters={'mip.tolerances.mipgap': 0.01, 'timelimit': 180}
try:
for pk, pv in arg_params.items():
p = new_params.find_parameter(key=pk)
if not p:
docplex_fatal('Cannot find matching parameter from: {0!r}'.format(pk))
else:
p.set(pv)
self.cplex_parameters = new_params
except (TypeError, AttributeError):
docplex_fatal('Expecting CPLEX parameters or dict, got: {0!r}'.format(arg_params))
def update_from_list(self, key_value_pairs, logger=None):
# For each pair of `(name, value)` in values, try to set the
# attribute.
for name, value in key_value_pairs:
try:
self._set_value(self, name, value)
except AttributeError:
if logger is not None:
logger.warning("Ignoring undefined attribute : {0}".format(name))
def _set_value(self, root, property_spec, property_value):
property_list = property_spec.split('.')
property_chain = property_list[:-1]
to_be_set = property_list[-1]
o = root
for c in property_chain:
o = getattr(o, c)
try:
target_attribute = getattr(o, to_be_set)
except AttributeError:
target_attribute = None
if target_attribute is None:
# Simply set the attribute
try:
setattr(o, to_be_set, property_value)
except DOcplexException:
print('attribute not found: {0}'.format(to_be_set))
else:
# try a set_converted_value if it's a Parameter
try:
target_attribute.set(property_value)
except (AttributeError, TypeError):
# no set(), just setattr
setattr(o, to_be_set, property_value)
class OverridenOutputContext(object):
def __init__(self, mdl, stream):
self._model = mdl
self._stream = stream
self._saved_context_log_output = mdl.context.solver.log_output
self._saved_log_output_stream = mdl.log_output
def __enter__(self):
mdl = self._model
# change stream
mdl.set_log_output(self._stream)
return mdl
def __exit__(self, exc_type, exc_val, exc_tb):
mdl = self._model
log_stream = mdl.log_output
if log_stream:
try:
log_stream.flush()
except AttributeError:
pass
try:
log_stream.custom_close()
except AttributeError:
pass
saved_log_output_stream = self._saved_log_output_stream
saved_context_log_output = self._saved_context_log_output
if self._saved_log_output_stream != mdl.log_output:
mdl.set_log_output_as_stream(saved_log_output_stream)
if saved_context_log_output != mdl.context.solver.log_output:
mdl.context.solver.log_output = saved_context_log_output