# --------------------------------------------------------------------------
# Source file provided under Apache License, Version 2.0, January 2004,
# http://www.apache.org/licenses/
# (c) Copyright IBM Corp. 2015, 2019
# --------------------------------------------------------------------------
from docplex.mp.progress import SolutionListener, ProgressClock
from docplex.mp.qprogress import QProgressListener
from docplex.util.environment import get_environment
try:
import pandas as pd
except ImportError:
pd = None
from docplex.util.csv_utils import write_csv, write_table_as_csv
class _BarrierProgressPubisher(QProgressListener):
def __init__(self, publish_hook=None):
super().__init__()
self.publish_hook = publish_hook
def notify_progress(self, qprogress_data):
env = get_environment()
# 1. Start with empty table
name_values = {}
# new stats for https://github.ibm.com/IBMDecisionOptimization/dd-planning/issues/2491
if env.is_dods():
name_values['STAT.cplex.solve.iterationCount'] = qprogress_data.current_nb_iterations
name_values['STAT.cplex.solve.elapsedTime'] = qprogress_data.time
name_values['STAT.cplex.solve.primalObjective'] = qprogress_data.primal_objective_value
name_values['STAT.cplex.solve.dualObjective'] = qprogress_data.dual_objective_value
name_values['STAT.cplex.solve.primalInfeasibility'] = qprogress_data.primal_infeas
name_values['STAT.cplex.solve.dualInfeasibility'] = qprogress_data.dual_infeas
# publish ..or perish...
if self.publish_hook is not None:
self.publish_hook(name_values)
class _KpiRecorder(SolutionListener):
def __init__(self, model, clock=ProgressClock.Gap,
publish_hook=None,
kpi_publish_format=None, absdiff=None, reldiff=None):
super(_KpiRecorder, self).__init__(clock, absdiff, reldiff)
self.model = model
self._context = model.context
self.publish_hook = publish_hook
self.kpi_publish_format = kpi_publish_format or 'KPI.%s'
self.publish_name_fn = lambda kn: self.kpi_publish_format % kn
# stored dictionaries of kpi values (name: value)
self._kpi_dicts = []
def notify_start(self):
super(_KpiRecorder, self).notify_start()
self._kpi_dicts = []
@property
def nb_reported(self):
return len(self._kpi_dicts)
def notify_solution(self, sol):
env = get_environment()
pdata = self.current_progress_data
context = self._context
# 1. Start with empty table
name_values = {}
# 2. add predefined keys for obj, time.
name_values['PROGRESS_CURRENT_OBJECTIVE'] = sol.objective_value
# 3. store it (why???)
self._kpi_dicts.append(name_values)
# new stats for https://github.ibm.com/IBMDecisionOptimization/dd-planning/issues/2491
if env.is_dods():
name_values['PROGRESS_GAP'] = pdata.mip_gap
name_values['PROGRESS_BEST_OBJECTIVE'] = pdata.best_bound
name_values['STAT.cplex.solve.explored'] = pdata.current_nb_nodes
name_values['STAT.cplex.solve.opened'] = pdata.remaining_nb_nodes
name_values['STAT.cplex.solve.iterationCount'] = pdata.current_nb_iterations
name_values['STAT.cplex.solve.elapsedTime'] = pdata.time
# add KPIs
publish_name_fn = self.publish_name_fn
name_values.update({publish_name_fn(kp.name): kp.compute(sol) for kp in self.model.iter_kpis()})
name_values[publish_name_fn('_time')] = pdata.time
# usually publish kpis in environment...
if self.publish_hook is not None:
self.publish_hook(name_values)
# save kpis.csv table
if auto_publishing_kpis_table_names(context) is not None:
write_kpis_table(env=env,
context=context,
model=self.model,
solution=sol)
def iter_kpis(self):
return iter(self._kpi_dicts)
def __as_df__(self, **kwargs):
try:
from pandas import DataFrame
except ImportError:
raise RuntimeError("convert as DataFrame: This feature requires pandas")
df = DataFrame(self._kpi_dicts, **kwargs)
return df
def get_auto_publish_names(context, prop_name, default_name):
# comparing auto_publish to boolean values because it can be a non-boolean
autopubs = context.solver.auto_publish
if autopubs == None:
return None
if autopubs is True:
return [default_name]
elif autopubs is False:
return None
elif prop_name in autopubs:
name = autopubs[prop_name]
else:
name = None
if isinstance(name, str):
# only one string value: make this the name of the table
# in a list with one object
name = [name]
elif name is True:
# if true, then use default name:
name = [default_name]
elif name is False:
# Need to compare explicitely to False
name = None
else:
# otherwise the kpi_table_name can be a collection-like of names,
# just return it
pass
return name
def auto_publishing_result_output_names(context):
# Return the list of result output names for saving
return get_auto_publish_names(context, 'result_output', 'solution.json')
def auto_publishing_kpis_table_names(context):
# Return the list of kpi table names for saving
return get_auto_publish_names(context, 'kpis_output', 'kpis.csv')
def get_kpis_name_field(context):
autopubs = context.solver.auto_publish
if autopubs is True:
field = 'Name'
elif autopubs is False:
field = None
else:
field = context.solver.auto_publish.kpis_output_field_name
return field
def get_kpis_value_field(context):
autopubs = context.solver.auto_publish
if autopubs is True:
field = 'Value'
elif autopubs is False:
field = None
else:
field = context.solver.auto_publish.kpis_output_field_value
return field
[docs]class PublishResultAsDf(object):
'''Mixin for classes publishing a result as data frame
'''
@staticmethod
def value_if_defined(obj, attr_name, default=None):
value = getattr(obj, attr_name) if hasattr(obj, attr_name) else None
return value if value is not None else default
[docs] def write_output_table(self, df, context,
output_property_name=None,
output_name=None):
'''Publishes the output `df`.
The `context` is used to control the output name:
- If context.solver.auto_publish is true, the `df` is written using
output_name.
- If context.solver.auto_publish is false, This method does nothing.
- If context.solver.auto_publish.output_property_name is true,
then `df` is written using output_name.
- If context.solver.auto_publish.output_propert_name is None or
False, this method does nothing.
- If context.solver.auto_publish.output_propert_name is a string,
it is used as a name to publish the df
Example:
A solver can be defined as publishing a result as data frame::
class SomeSolver(PublishResultAsDf)
def __init__(self, output_customizer):
# output something if context.solver.autopublish.somesolver_output is set
self.output_table_property_name = 'somesolver_output'
# output filename unless specified by somesolver_output:
self.default_output_table_name = 'somesolver.csv'
# customizer if users wants one
self.output_table_customizer = output_customizer
# uses pandas.DataFrame if possible, otherwise will use namedtuples
self.output_table_using_df = True
def solve(self):
# do something here and return a result as a df
result = pandas.DataFrame(columns=['A','B','C'])
return result
Example usage::
solver = SomeSolver()
results = solver.solve()
solver.write_output_table(results)
'''
prop = self.value_if_defined(self, 'output_table_property_name')
prop = output_property_name if output_property_name else prop
default_name = self.value_if_defined(self, 'default_output_table_name')
default_name = output_name if output_name else default_name
names = get_auto_publish_names(context, prop, default_name)
use_df = self.value_if_defined(self, 'output_table_using_df', True)
if names:
env = get_environment()
customizer = self.value_if_defined(self, 'output_table_customizer', lambda x: x)
for name in names:
r = customizer(df)
if pd and use_df:
env.write_df(r, name)
else:
# assume r is a namedtuple
write_csv(env, r, r[0]._fields, name)
def is_publishing_output_table(self, context):
prop = self.value_if_defined(self, 'output_table_property_name')
default_name = self.value_if_defined(self, 'default_output_table_name')
names = get_auto_publish_names(context, prop, default_name)
return names
def write_kpis_table(env, context, model, solution):
names = auto_publishing_kpis_table_names(context)
kpis_table = []
for k in model.iter_kpis():
kpis_table.append([k.name, k.compute(solution)])
if kpis_table:
# do not create the kpi tables if there are no kpis to be written
field_names = [get_kpis_name_field(context),
get_kpis_value_field(context)]
for name in names:
write_table_as_csv(env, kpis_table, name, field_names)
def write_solution(env, solution, name):
with env.get_output_stream(name) as output:
output.write(solution.export_as_json_string().encode('utf-8'))
def write_result_output(env, context, model, solution):
names = auto_publishing_result_output_names(context)
for name in names:
write_solution(env, solution, name)