Source code for docplex.cp.function

# --------------------------------------------------------------------------
# Source file provided under Apache License, Version 2.0, January 2004,
# http://www.apache.org/licenses/
# (c) Copyright IBM Corp. 2015 .. 2022
# --------------------------------------------------------------------------
# Author: Philippe LABORIE, IBM Analytics, France Lab, Gentilly

"""
This module contains the objects representing CP Optimizer function expressions,
Stepwise and Piecewise Linear functions:

In particular, it defines the following classes:

 * :class:`CpoFunction`: the root class of all function expressions,
 * :class:`CpoSegmentedFunction`: for functions represented as a list of segments,
 * :class:`CpoStepFunction`: for functions represented as a list of steps.


Detailed description
--------------------
"""

from docplex.cp.catalog import Type_SegmentedFunction, Type_StepFunction
from docplex.cp.expression import CpoExpr, INTERVAL_MIN, INTERVAL_MAX
from docplex.cp.utils import *
import bisect
import copy


[docs]class CpoFunction(CpoExpr): """ Parent class for step and segmented functions. """ __slots__ = ('_s0', # Initial slope of the function '_v0', # Initial value of the function '_x', # Abscissas '_v', # Values '_s', # Slopes ) def __init__(self, typ, s0=None, v0=None, x=None, v=None, s=None, name=None): """ **Constructor** Args: typ: Function type. s0: Initial slope of the function. v0: Initial value of the function. x: Abscissas (assumed to be sorted and all different). v: Values. s: Slopes. """ super(CpoFunction, self).__init__(typ, name) if v0 is None: self._v0 = 0 else: self._v0 = v0 if s0 is None: self._s0 = 0 else: # Test type is segmented function when some slope is specified assert (s0 == 0) or (typ == Type_SegmentedFunction), "Slopes are only supported for segmented functions" # Test there exists at least one abscissa assert s0 == 0 or (x is not None and 0 < len(x)), "No abscissa for initial slope" self._s0 = s0 if x is None: self._x = [] else: # Test abscissa integrality assert all(is_int(elt) for elt in x), "Abscissa value is not an integer" # Test abscissa list is strictly increasing assert all(x[i] < x[i + 1] for i in range(len(x) - 1)), "Abscissa values are not strictly increasing" self._x = x l = len(self._x) if v is None: self._v = [] else: # Test value integrality in case of step function if typ == Type_StepFunction: assert all(is_int(elt) for elt in v), "Value is not an integer" self._v = v assert len(self._v) == l, "Lists x and v must have the same length" if s is None: self._s = [0] * len(self._x) else: # Test type is segmented function when some slope is specified assert all( v == 0 for v in s) or typ == Type_SegmentedFunction, "Slopes are only supported for segmented functions" self._s = s assert len(self._s) == l, "Lists x and s must have the same length" def _get_expr_string(self): """ Get the string representing this expression, name excluded. """ return "Function(...)" @property def v0(self): return self._v0 @property def s0(self): return self._s0 @property def x(self): return self._x @property def v(self): return self._v @property def s(self): return self._s @property def is_step_function(self): return self.type == Type_StepFunction @property def is_segmented_function(self): return self.type == Type_SegmentedFunction
[docs] def get_value(self, t): """ Gets the value of the function. Gets the value of the function for an abscissa `t`. Complexity is in `O(log n)` for a function with `n` segments. Args: t (int or float): Abscissa value. Returns: Value of the function for abscissa `t`. """ l = len(self._x) if l == 0: # Constant step function return self._v0 if t < self._x[0]: # Abscissa t is on initial step return self._v0 - (self._x[0] - t) * self._s0 # Abscissa is on another step i = bisect.bisect_right(self._x, t) if self._s is None: return self._v[i - 1] else: return self._v[i - 1] + (t - self._x[i - 1]) * self._s[i - 1]
[docs] def copy(self, type=None): """ Creates and returns a copy of the function. """ res = copy.copy(self) res._x = self._x[:] res._v = self._v[:] res._s = self._s[:] if type is not None: res.tp = type return res
[docs] def set_slope(self, x1, x2, v1, s): """ Sets the value of the invoking function over an interval. Sets the value of the invoking function on interval `[x1,x2)` to equal to `f(x) = v1 + s * (x-x1)`. Args: x1 (int): Start of the interval. x2 (int): End of the interval. v1 (int or float): Function value at `x1`. s (int or float): Function slope on interval `[x1,x2)`. """ assert s == 0 or not self.is_step_function, "Function not allowed on CpoStepFunction" assert is_int(x1) and is_int(x2), "Interval [x1,x2) must be integer" if x2 <= x1: return l = len(self._x) if l == 0: # Constant step function of value self._v0 if not (s == 0 and v1 == self._v0): self._x.append(x1) self._v.append(v1) self._s.append(s) if x2 < INTERVAL_MAX: self._x.append(x2) self._v.append(self._v0) self._s.append(0) return if x1 < self._x[0]: if self._s0 == s and self._v0 + s * (x1 - self._x[0]) == v1: # No change on initial segment if x2 <= self._x[0]: # Completely inside initial segment return # Make initial segment longer self._v0 += s * (x2 - self._x[0]) i2 = bisect.bisect_right(self._x, x2) - 1 if 0 < i2: # Remove segments 0..i2-1 and shift arrays del self._x[:i2] del self._v[:i2] del self._s[:i2] # Adjust beginning of first segment self._v[0] += (x2 - self._x[0]) * self._s[0] self._x[0] = x2 return else: # Adjust value of initial segment v0 = self._v0 x0 = self._x[0] self._v0 += self._s0 * (x1 - self._x[0]) # Add new segment self._x.insert(0, x1) self._v.insert(0, v1) self._s.insert(0, s) if x2 < x0: self._x.insert(1, x2) self._v.insert(1, v0 + (x2 - x0) * self._s0) self._s.insert(1, self._s0) return elif x0 <= x2: self.set_slope(x0, x2, v1 + (x0 - x1) * s, s) return else: # x1 >= self._x[0]: i1 = bisect.bisect_right(self._x, x1) - 1 i2 = bisect.bisect_right(self._x, x2) - 1 if self._s[i1] == s and self._v[i1] + s * (x1 - self._x[i1]) == v1: # Same as current segment if i1 == i2: return else: self.set_slope(self._x[i1 + 1], x2, v1 + (self._x[i1 + 1] - x1) * s, s) return else: xcurr = self._x[i1] vcurr = self._v[i1] scurr = self._s[i1] if x1 > self._x[i1]: self._x.insert(i1 + 1, x1) self._v.insert(i1 + 1, v1) self._s.insert(i1 + 1, s) i1 += 1 i2 += 1 else: assert x1 == self._x[i1] self._x[i1] = x1 self._v[i1] = v1 self._s[i1] = s if i1 == i2: self._x.insert(i1 + 1, x2) self._v.insert(i1 + 1, vcurr + (x2 - xcurr) * scurr) self._s.insert(i1 + 1, scurr) return else: # Adjust beginning of first segment self._v[i2] += (x2 - self._x[i2]) * self._s[i2] self._x[i2] = x2 del self._x[i1 + 1:i2] del self._v[i1 + 1:i2] del self._s[i1 + 1:i2] return
[docs] def add_slope(self, x1, x2, v1, s): """ Adds a piecewise linear step on an interval. Adds a piecewise linear step `f(x) = v1 + s * (x-x1)` to the invoking function on interval `[x1,x2)`. Args: x1 (int): Start of the interval. x2 (int): End of the interval. v1 (int or float): Added value at `x1`. s (int or float): Added function slope on interval `[x1,x2)`; """ if v1 == 0 and s == 0: return assert s == 0 or not self.is_step_function, "Function not allowed on CpoStepFunction" assert is_int(x1) and is_int(x2), "Interval [x1,x2) must be integer" if x2 <= x1: return l = len(self._x) if l == 0: # Constant step function of value self._v0 self._x.append(x1) self._v.append(self._v0 + v1) self._s.append(s) if x2 < INTERVAL_MAX: self._x.append(x2) self._v.append(self._v0) self._s.append(0) return if x1 < self._x[0]: # Adjust value of initial segment v0 = self._v0 x0 = self._x[0] self._v0 += self._s0 * (x1 - self._x[0]) # Add new segment self._x.insert(0, x1) self._v.insert(0, self._v0 + v1) self._s.insert(0, self._s0 + s) if x2 < x0: self._x.insert(1, x2) self._v.insert(1, v0 + (x2 - x0) * self._s0) self._s.insert(1, self._s0) return elif x0 <= x2: self.add_slope(x0, x2, v1 + (x0 - x1) * s, s) return else: # x1 >= self._x[0]: i1 = bisect.bisect_right(self._x, x1) - 1 i2 = bisect.bisect_right(self._x, x2) - 1 xcurr = self._x[i1] vcurr = self._v[i1] scurr = self._s[i1] if x1 > self._x[i1]: self._x.insert(i1 + 1, x1) self._v.insert(i1 + 1, vcurr + (x1 - self._x[i1]) * scurr + v1) self._s.insert(i1 + 1, scurr + s) i1 += 1 i2 += 1 else: assert x1 == self._x[i1] self._x[i1] = x1 self._v[i1] = vcurr + v1 self._s[i1] = scurr + s for i in range(i1 + 1, i2): self._v[i] = self._v[i] + v1 + (self._x[i] - x1) * s self._s[i] += s if i1 == i2: self._x.insert(i1 + 1, x2) self._v.insert(i1 + 1, vcurr + (x2 - xcurr) * scurr) self._s.insert(i1 + 1, scurr) return elif x2 > self._x[i2]: # Adjust beginning of first segment xcurr = self._x[i2] vcurr = self._v[i2] scurr = self._s[i2] self._v[i2] = self._v[i2] + v1 + (self._x[i2] - x1) * s self._s[i2] += s self._x.insert(i2 + 1, x2) self._v.insert(i2 + 1, vcurr + (x2 - xcurr) * scurr) self._s.insert(i2 + 1, scurr) return
def _add(self, other): ov0 = other._v0 os0 = other._s0 ol = len(other._x) if ol == 0: # 'other' is a constant function of value ov0 self._v0 += ov0 for i in range(len(self._x)): self._v[i] += ov0 elif not self._x: # 'self' is a constant function of value _v0 self._x = other._x[:] self._s = other._s[:] self._v = [v + self._v0 for v in other._v] self._v0 += ov0 else: ox0 = other._x[0] if ox0 == self._x[0]: self._v0 += ov0 self._s0 += os0 elif ox0 < self._x[0]: self._v0 = (ox0 - self._x[0]) * self._s0 self._x.insert(0, ox0) self._v.insert(0, self._v0) self._s.insert(0, self._s0) self._v0 += ov0 self._s0 += os0 else: # self._x[0] < ox0 self._s0 += os0 self._v0 += ov0 + (self._x[0] - ox0) * os0 self.add_slope(self._x[0], ox0, ov0 + (self._x[0] - ox0) * os0, os0) for i in range(ol - 1): # This could be optimized to avoid calling add_slope self.add_slope(other._x[i], other._x[i + 1], other._v[i], other._s[i]) self.add_slope(other._x[ol - 1], INTERVAL_MAX, other._v[ol - 1], other._s[ol - 1]) return self def _add_number(self, k): self._v0 += k for i in range(len(self._x)): self._v[i] += k return self def _mul(self, k): self._s0 *= k self._v0 *= k for i in range(len(self._x)): self._v[i] *= k self._s[i] *= k return self def __iadd__(self, other): if isinstance(other, CpoFunction): assert (self.is_segmented_function or not other.is_segmented_function) return self._add(other) elif is_number(other) or is_bool(other): return self._add_number(other) def __isub__(self, other): if isinstance(other, CpoFunction): assert (self.is_segmented_function or not other.is_segmented_function) tmp = other.copy()._mul(-1) return self._add(tmp) elif is_number(other) or is_bool(other): return self._add_number(-other) def __add__(self, other): if isinstance(other, CpoFunction): if self.is_segmented_function: return other.copy(Type_SegmentedFunction)._add(self) else: return other.copy()._add(self) elif is_number(other) or is_bool(other): return self.copy()._add_number(other) def __radd__(self, value): return self.__add__(value) def __sub__(self, other): if isinstance(other, CpoFunction): if self.is_segmented_function: return other.copy(Type_SegmentedFunction)._mul(-1)._add(self) else: return other.copy()._mul(-1)._add(self) elif is_number(other) or is_bool(other): return self.copy()._add_number(-other) def __neg__(self): return self.copy()._mul(-1) def __imul__(self, value): return self._mul(value) def __mul__(self, value): return self.copy()._mul(value) def __rmul__(self, value): return self.__mul__(value)
[docs] def set_value(self, x1, x2, v): """ Sets the value of the function over an interval. Sets the value of the invoking function to be constant and equal to `v` on the interval `[x1,x2)`. Args: x1 (int): Start of the interval (included). x2 (int): End of the interval (excluded) v (int or float): Function value. """ self.set_slope(x1, x2, v, 0)
[docs] def add_value(self, x1, x2, v): """ Adds a constant to the value of the function over an interval. Adds constant value `v` to the value of the invoking function on the interval `[x1,x2)`. Args: x1 (int): Start of the interval. x2 (int): End of the interval. v (int or float): Added value. """ self.add_slope(x1, x2, v, 0)
def __pos__(self): return self
[docs]class CpoSegmentedFunction(CpoFunction): """ Class representing a segmented function. In CP Optimizer, piecewise linear functions are typically used in modeling a known function of time, for instance the cost that is incurred for completing an activity after a known date. A segmented function is a piecewise linear function defined on an interval [xmin, xmax) which is partitioned into segments such that over each segment, the function is linear. When two consecutive segments of the function are colinear, these segments are merged so that the function is always represented with the minimal number of segments. """ def __init__(self, segment0=None, segments=None, name=None): """ **Constructor** Args: segment0 (tuple): Initial segment of the function (slope, vright). segments (list): Segments of the function represented as a list of tuples (xleft, vleft, slope). """ s0 = 0 v0 = 0 x = [] v = [] s = [] if segment0 is not None: s0 = segment0[0] v0 = segment0[1] if segments is not None: for seg in segments: x.append(seg[0]) v.append(seg[1]) s.append(seg[2]) super(CpoSegmentedFunction, self).__init__(Type_SegmentedFunction, s0=s0, v0=v0, x=x, v=v, s=s, name=name)
[docs] def get_segment_list(self): """ Returns the list of segments of the function. """ segments = [(self.s0, self.v0)] for i in range(len(self.x)): segments.append((self.x[i], self.v[i], self.s[i])) return segments
def __str__(self): """ Build a string representing this function. """ res = "SegmentedFunction(" + str(self.get_segment_list()) + ")" if self.name: res = self.name + "=" + res return res
[docs]class CpoStepFunction(CpoFunction): """ Class representing a step function. In CP Optimizer, stepwise functions are typically used to model the efficiency of a resource over time. A stepwise function is a special case of piecewise linear function where all slopes are equal to 0 and the domain and image of the function are integer. When two consecutive steps of the function have the same value, these steps are merged so that the function is always represented with the minimal number of steps. The function steps are expressed as a list of couples (x, val) specifying that the value of the function is *val* after *x*, up to the next step. By default, the value of the function is zero up to the first step. To change this default value, the first step should be (INTERVAL_MIN, value). """ def __init__(self, steps=None, name=None): """ **Constructor** Args: steps: (Optional) Function steps, expressed as a list of couples (x, val). name: (Optional) Function name """ super(CpoStepFunction, self).__init__(Type_StepFunction, name=name) if steps: x = [] v = [] if steps[0][0] == INTERVAL_MIN: v0 = steps[0][1] else: v0 = 0 x.append(steps[0][0]) v.append(steps[0][1]) for s in steps[1:]: x.append(s[0]) v.append(s[1]) super(CpoStepFunction, self).__init__(typ=Type_StepFunction, s0=None, v0=v0, x=x, v=v, s=None, name=name)
[docs] def get_step_list(self): """ Returns the list of steps of the function. """ steps = [] if self.v0 != 0: steps.append((INTERVAL_MIN, self.v0)) for i in range(len(self.x)): steps.append((self.x[i], self.v[i])) return steps
def __str__(self): """ Build a string representing this function. """ res = "StepFunction(" + str(self.get_step_list()) + ")" if self.name: res = self.name + "=" + res return res