Source code for docplex.mp.sparktrans.transformers
# --------------------------------------------------------------------------
# Source file provided under Apache License, Version 2.0, January 2004,
# http://www.apache.org/licenses/
# (c) Copyright IBM Corp. 2018
# --------------------------------------------------------------------------
try:
from pyspark import keyword_only ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param
from pyspark.sql import SparkSession
spark_version = SparkSession.builder.getOrCreate().version
except ImportError:
spark_version = None
def keyword_only(x):
return x
Transformer = object
Param = SparkSession = None
from docplex.mp.constants import ObjectiveSense
from docplex.mp.sktrans.modeler import make_modeler
from docplex.mp.utils import *
from docplex.mp.sparktrans.spark_utils import make_solution
def convert_to_list_or_value(value):
if is_spark_dataframe(value):
return value.rdd.flatMap(lambda x: x).collect()
elif is_pandas_series(value):
return value.tolist()
elif is_numpy_ndarray(value):
return value.tolist()
return value
[docs]class CplexTransformerBase(Transformer):
""" Root class for CPLEX transformers for PySpark
"""
@keyword_only
def __init__(self, rhsCol=None, minCol=None, maxCol=None, y=None, lbs=None, ubs=None, types=None, sense="min", modeler="cplex", solveParams=None):
super(CplexTransformerBase, self).__init__()
self.rhsCol = Param(self, "rhsCol", "")
self.minCol = Param(self, "minCol", "")
self.maxCol = Param(self, "maxCol", "")
self.y = Param(self, "y", "")
self.lbs = Param(self, "lbs", "")
self.ubs = Param(self, "ubs", "")
self.types = Param(self, "types", "")
self.sense = Param(self, "sense", "")
self.modeler = Param(self, "modeler", "")
self.solveParams = Param(self, "solveParams", "")
self._setDefault(rhsCol=None)
self._setDefault(minCol=None)
self._setDefault(maxCol=None)
self._setDefault(y=None)
self._setDefault(lbs=None)
self._setDefault(ubs=None)
self._setDefault(types=None)
self._setDefault(sense=sense)
self._setDefault(modeler=modeler)
self._setDefault(solveParams={})
if spark_version < '2.2':
kwargs = self.__init__._input_kwargs
else:
kwargs = self._input_kwargs
self.setParams(**kwargs)
self.docplex_modeler = make_modeler(self.getModeler())
@keyword_only
def setParams(self, rhsCol=None, minCol=None, maxCol=None, y=None, lbs=None, ubs=None, types=None, solveParams=None, sense=None, modeler=None):
if spark_version < '2.2':
kwargs = self.__init__._input_kwargs
else:
kwargs = self._input_kwargs
return self._set(**kwargs)
def setRhsCol(self, value):
self._paramMap[self.rhsCol] = value
return self
def getRhsCol(self):
return self.getOrDefault(self.rhsCol)
def setMinCol(self, value):
self._paramMap[self.minCol] = value
return self
def getMinCol(self):
return self.getOrDefault(self.minCol)
def setMaxCol(self, value):
self._paramMap[self.maxCol] = value
return self
def getMaxCol(self):
return self.getOrDefault(self.maxCol)
def setY(self, value):
self._paramMap[self.y] = value
return self
def getY(self):
return self.getOrDefault(self.y)
def setLbs(self, value):
self._paramMap[self.lbs] = value
return self
def getLbs(self):
return self.getOrDefault(self.lbs)
def setUbs(self, value):
self._paramMap[self.ubs] = value
return self
def getUbs(self):
return self.getOrDefault(self.ubs)
def setTypes(self, value):
self._paramMap[self.types] = value
return self
def getTypes(self):
return self.getOrDefault(self.types)
def setSense(self, value):
self._paramMap[self.sense] = value
return self
def getSense(self):
return self.getOrDefault(self.sense)
def setModeler(self, value):
self._paramMap[self.modeler] = value
return self
def getModeler(self):
return self.getOrDefault(self.modeler)
def setSolveParams(self, value):
self._paramMap[self.solveParams] = value
return self
def getSolveParams(self):
return self.getOrDefault(self.solveParams)
def _transform(self, dataset):
""" Main method to solve Linear Programming problems.
Transforms the input dataset.
:param dataset: the matrix describing the constraints of the problem, which is an instance
of :py:class:`pyspark.sql.DataFrame`
:returns: transformed dataset
"""
objSense = ObjectiveSense.parse(self.getSense())
var_lbs = convert_to_list_or_value(self.getLbs())
var_ubs = convert_to_list_or_value(self.getUbs())
var_types = self.getTypes()
min_max_cols = [self.getMinCol(), self.getMaxCol()] if self.getMinCol() else [self.getRhsCol()]
coef_colnames = [item for item in dataset.columns if item not in min_max_cols]
nb_vars = len(coef_colnames)
# Get min and max values as lists
minVals, maxVals = self._get_min_max_lists(dataset)
# Get coefficients for objective evaluation
costs = None if self.getY() is None else convert_to_list_or_value(self.getY())
assert (costs is None) or (len(costs) == nb_vars)
# Build matrix of coefficients
cts_mat = dataset.rdd.map(lambda x: [x[col] for col in coef_colnames]).collect()
# Call build matrix and solve
params = self.getSolveParams() # User-defined engine parameters
if minVals is None:
result = self.docplex_modeler.build_matrix_linear_model_and_solve(nb_vars, var_lbs, var_ubs, var_types,
coef_colnames,
cts_mat, maxVals,
objsense=objSense, costs=costs,
cast_to_float=True,
solution_maker=make_solution,
**params)
else:
result = self.docplex_modeler.build_matrix_range_model_and_solve(nb_vars, var_lbs, var_ubs, var_types, coef_colnames,
cts_mat=cts_mat,
range_mins=minVals, range_maxs=maxVals,
objsense=objSense, costs=costs,
cast_to_float=True,
solution_maker=make_solution,
**params)
# 'result' is a list: create a Spark DataFrame from it
sparkCtx = dataset.sql_ctx
return sparkCtx.createDataFrame(zip(coef_colnames, result), schema=['name', 'value'])
[docs]class CplexTransformer(CplexTransformerBase):
""" A PySpark transformer class to solve linear problems.
This transformer class solves LP problems of
type::
Ax <= B
"""
@keyword_only
def __init__(self, rhsCol=None, y=None, lbs=None, ubs=None, types=None, sense="min", modeler="cplex", solveParams=None):
"""
Creates an instance of LPTransformer to solve linear problems.
:param rhsCol: the name of the column in the input Spark DataFrame containing the upper bounds for the constraints.
:param y: an optional sequence of scalars describing the cost vector
:param lbs: an optional sequence of scalars describing the lower bounds for decision variables
:param ubs: an optional sequence of scalars describing the upper bounds for decision variables
:param types: a string for variable types within [BICSN]*
:param sense: defines the objective sense. Accepts 'min" or "max" (not case-sensitive),
or an instance of docplex.mp.ObjectiveSense
:param solveParams: optional keyword arguments to pass additional parameters for Cplex engine.
Note:
The Spark dataframe representing the matrix is supposed to have shape (M,N+1) where M is the number of rows
and N the number of variables. The column named by the 'rhsCol' parameter contains the right hand sides of the problem
(the B in Ax <= B)
The optional vector y contains the N cost coefficients for each column variables.
Example:
Passing the Spark DataFrame = ([[1,2,3], [4,5,6]], ['x', 'y', 'max']), rhsCol = 'max', y= [11,12] means
solving the linear problem:
minimize 11x + 12y
s.t.
1x + 2y <= 3
4x + 5y <= 6
"""
if spark_version < '2.2':
super(CplexTransformer, self).__init__(**self.__init__._input_kwargs)
else:
super(CplexTransformer, self).__init__(**self._input_kwargs)
def _get_min_max_lists(self, dataset):
assert self.getRhsCol() and self.getRhsCol() in dataset.columns, '"rhsCol" parameter is not specified'
maxVals = dataset.select(self.getRhsCol()).rdd.flatMap(lambda x: x).collect()
return None, maxVals
[docs]class CplexRangeTransformer(CplexTransformerBase):
""" A PySpark transformer class to solve range-based linear problems.
This transformer class solves LP problems of
type::
B <= Ax <= C
"""
@keyword_only
def __init__(self, minCol=None, maxCol=None, y=None, lbs=None, ubs=None, types=None, sense="min", modeler="cplex", solveParams=None):
"""
Creates an instance of LPRangeTransformer to solve range-based linear problems.
:param minCol: the name of the column in the input Spark DataFrame containing the lower bounds for the constraints.
:param maxCol: the name of the column in the input Spark DataFrame containing the upper bounds for the constraints.
:param y: an optional sequence of scalars describing the cost vector
:param lbs: an optional sequence of scalars describing the lower bounds for decision variables
:param ubs: an optional sequence of scalars describing the upper bounds for decision variables
:param types: a string for variable types within [BICSN]*
:param sense: defines the objective sense. Accepts 'min" or "max" (not case-sensitive),
or an instance of docplex.mp.ObjectiveSense
:param solveParams: optional keyword arguments to pass additional parameters for Cplex engine.
Note:
The matrix X is supposed to have shape (M,N+2) where M is the number of rows
and N the number of variables.
The column named by the 'minCol' and 'maxCol' parameters contain the minimum (resp.maximum) values for the
row ranges, that m and M in:
m <= Ax <= M
The optional vector y contains the N cost coefficients for each column variables.
Example:
Passing the Spark DataFrame = ([[1,2,3,30], [4,5,6,60]], ['x', 'y', 'min', 'max']), minCol = 'min',
maxCol = 'max', y= [11,12] means solving the linear problem:
minimize 11x + 12y
s.t.
3 <= 1x + 2y <= 30
6 <= 4x + 5y <= 60
"""
if spark_version < '2.2':
super(CplexRangeTransformer, self).__init__(**self.__init__._input_kwargs)
else:
super(CplexRangeTransformer, self).__init__(**self._input_kwargs)
def _get_min_max_lists(self, dataset):
assert self.getMinCol() and self.getMinCol() in dataset.columns, '"minCol" parameter is not specified'
assert self.getMaxCol() and self.getMaxCol() in dataset.columns, '"maxCol" parameter is not specified'
minVals = dataset.select(self.getMinCol()).rdd.flatMap(lambda x: x).collect()
maxVals = dataset.select(self.getMaxCol()).rdd.flatMap(lambda x: x).collect()
return minVals, maxVals