Source code for docplex.cp.fzn.fzn_parser

# --------------------------------------------------------------------------
# Source file provided under Apache License, Version 2.0, January 2004,
# http://www.apache.org/licenses/
# (c) Copyright IBM Corp. 2017, 2018
# --------------------------------------------------------------------------
# Author: Olivier OUDOT, IBM Analytics, France Lab, Sophia-Antipolis

"""
Parser converting a FZN file to internal model representation.

This parser does not support the complete set of predicates described in the specifications of FlatZinc
that can be found here: http://www.minizinc.org/downloads/doc-1.6/flatzinc-spec.pdf

Basically, it supports essentially integer expressions, some floating point expressions and custom
predicates related to scheduling.

The predicates that are supported are:

 * *array predicates*

   array_bool_and, array_bool_element, array_bool_or, array_bool_xor,
   array_float_element, array_int_element, array_set_element,
   array_var_bool_element, array_var_float_element, array_var_int_element, array_var_set_element.

 * *boolean predicates*

   bool2int, bool_and, bool_clause, bool_eq, bool_eq_reif, bool_le, bool_le_reif,
   bool_lin_eq, bool_lin_le, bool_lt, bool_lt_reif, bool_not, bool_or, bool_xor.

 * *integer predicates*

   int_abs, int_div, int_eq, int_eq_reif, int_le, int_le_reif, int_lin_eq, int_lin_eq_reif,
   int_lin_le, int_lin_le_reif, int_lin_ne, int_lin_ne_reif, int_lt, int_lt_reif, int_max, int_min,
   int_mod, int_ne, int_ne_reif, int_plus, int_times, int2float.

 * *float predicates*

   float_abs, float_exp, float_ln, float_log10, float_log2, float_sqrt, float_eq, float_eq_reif,
   float_le, float_le_reif, float_lin_eq, float_lin_eq_reif, float_lin_le, float_lin_le_reif, float_lin_lt,
   float_lin_lt_reif, float_lin_ne, float_lin_ne_reif, float_lt, float_lt_reif, float_max, float_min,
   float_ne, float_ne_reif, float_plus.

 * *set predicates*

   set_in, set_in_reif.

 * *custom predicates*

   all_different_int, subcircuit, count_eq_const, table_int, inverse,
   lex_lesseq_bool, lex_less_bool, lex_lesseq_int, lex_less_int, int_pow, cumulative


Detailed description
--------------------
"""

from docplex.cp.fzn.fzn_tokenizer import *
from docplex.cp.expression import *
from docplex.cp.solution import *
from docplex.cp.model import CpoModel
import docplex.cp.modeler as modeler
import docplex.cp.config as config
import docplex.cp.expression as expression
import collections
from docplex.cp.utils import xrange, is_int_value
import traceback


###############################################################################
## Constants
###############################################################################

###############################################################################
## Public classes
###############################################################################

[docs]class FznParserException(CpoException): """ The base class for exceptions raised by the CPO parser """ def __init__(self, msg): """ Create a new exception Args: msg: Error message """ super(FznParserException, self).__init__(msg)
# Parameter descriptor FznParameter = collections.namedtuple('FznParameter', ('name', # Variable name 'type', # Variable type (string) 'size', # Array size (if array), None for single value 'value', # Value ))
[docs]class FznObject(object): """ Descriptor of a FZN object """ __slots__ = ()
[docs]class FznParameter(FznObject): """ Descriptor of a FZN parameter """ __slots__ = ('name', # Parameter name 'type', # Parameter type 'size', # Array size (if array), None for variable 'value', # Initial value (if any) ) def __init__(self, name, type, size, value): """ Create a new FZN parameter Args: name: Name of the parameter type: Type of the parameter size: Array size, None if not array value: Parameter value """ self.name = name self.type = type self.size = size self.value = value def __str__(self): lstr = [self.name, "(type=", str(self.type)] if self.size: lstr.append(', size=') lstr.append(str(self.size)) lstr.append(', value=[') lstr.append(', '.join(str(x) for x in self.value)) lstr.append(']') else: lstr.append(', value=') lstr.append(str(self.value)) lstr.append(')') return ''.join(lstr)
[docs]class FznVariable(FznObject): """ Descriptor of a FZN variable """ __slots__ = ('name', # Variable name 'type', # Variable type (String) 'domain', # Domain 'size', # Array size (if array), None for variable 'value', # Initial value (if any) 'annotations', # Dictionary of annotations # Attributes needed for model reduction 'ref_vars', # Tuple of variables referenced by this variable ) def __init__(self, name, type, domain, annotations, size, value): """ Create a new FZN variable Args: name: Name of the variable type: Variable type domain: Variable domain annotations: Declaration annotations (dictionary) size: Array size, None if not array value: Initial value, None if none """ self.name = name self.type = type self.domain = domain self.annotations = annotations self.size = size self.value = value
[docs] def is_defined(self): """ Check if the variable is introduced Return: True if variable is introduced, False otherwise """ return 'is_defined_var' in self.annotations
[docs] def is_introduced(self): """ Check if the variable is introduced Return: True if variable is introduced, False otherwise """ return 'var_is_introduced' in self.annotations
[docs] def is_output(self): """ Check if the variable is introduced Return: True if variable is introduced, False otherwise """ return ('output_var' in self.annotations) or ('output_array' in self.annotations)
def _get_domain_bounds(self): """ Get the variable domain bounds Return: Tuple of values, or single value if identical """ dmin = self.domain[0] dmin = dmin[0] if isinstance(dmin, tuple) else dmin dmax = self.domain[-1] dmax = dmax[-1] if isinstance(dmax, tuple) else dmax return (dmin, dmax) def __str__(self): lstr = [self.name, "(type=", self.type, ", dom=", str(self.domain)] if self.is_defined(): lstr.append(", defined") if self.is_introduced(): lstr.append(", introduced") if self.size: if self.value: lstr.append(', value=[') for i, x in enumerate(self.value): if i > 0: lstr.append(', ') if isinstance(x, tuple) and isinstance(x[0], FznVariable): lstr.append("{}[{}]".format(x[0].name, x[1])) elif isinstance(x, FznVariable): lstr.append(x.name) else: lstr.append(str(x)) lstr.append(']') else: lstr.append(", size={}".format(self.size)) elif self.value: lstr.append(", value={}".format(self.value)) lstr.append(')') return ''.join(lstr)
[docs]class FznConstraint(FznObject): """ Descriptor of a FZN constraint """ __slots__ = ('predicate', # Name of the predicate 'args', # Arguments 'defvar', # Name of the variable defined by this constraint # Attributes needed for model reduction 'ref_vars', # Tuple of variables referenced by this constraint, but not defined ) def __init__(self, predicate, args, annotations): """ Create a new FZN constraint Args: predicate: Name of the predicate args: List or arguments annotations: Declaration annotations """ self.predicate = predicate self.args = args self.defvar = annotations.get('defines_var', (None,))[0] self.ref_vars = () def _ref_vars_iterator(self): """ Iterator on the variables that are referenced in the arguments of this constraint. Returns: Iterator on all variables referenced by this constraint """ for a in self.args: if is_array(a): for v in a: if isinstance(v, FznVariable): yield v elif isinstance(a, FznVariable): yield a def __str__(self): lstr = [self.predicate, "("] for i, x in enumerate(self.args): if i > 0: lstr.append(', ') if isinstance(x, tuple) and isinstance(x[0], FznVariable): lstr.append("{}[{}]".format(x[0].name, x[1])) elif isinstance(x, (FznVariable, FznParameter)): lstr.append(x.name) elif isinstance(x, list): lstr.append("[{}]".format(', '.join(str(v) for v in x))) else: lstr.append(str(x)) lstr.append(')') if self.defvar: lstr.append(":") lstr.append(self.defvar.name) return ''.join(lstr)
[docs]class FznObjective(FznObject): """ Descriptor of a FZN objective """ __slots__ = ('operation', # Objective operation in 'satisfy', 'minimize', 'maximize' 'expr', # Target expression 'annotations', # Annotations ) def __init__(self, operation, expr, annotations): """ Create a new FZN constraint Args: operation: Objective operation in 'satisfy', 'minimize', 'maximize' expr: Target expression annotations: Annotations """ self.operation = operation self.expr = expr self.annotations = annotations def __str__(self): return "{} {} ({})".format(self.operation, self.expr, self.annotations)
[docs]class FznReader(object): """ Reader of FZN file format """ __slots__ = ('source_file', # Source file 'tokenizer', # Reading tokenizer 'token', # Last read token 'var_map', # Dictionary of variables. # Key is variable name, value is variable descriptor 'parameters', # List of parameters 'variables', # List of variables 'constraints', # List of model constraints 'objective', # Model objective ) def __init__(self, mdl=None): """ Create a new FZN reader """ super(FznReader, self).__init__() self.source_file = None self.tokenizer = None self.token = None self.var_map = {} self.parameters = [] self.variables = [] self.constraints = [] self.objective = None
[docs] def parse(self, cfile): """ Parse a FZN file Args: cfile: FZN file to read Raises: FznParserException: Parsing exception """ # Store file name if first file self.source_file = cfile self.tokenizer = FznTokenizer(file=cfile) self._read_document() self.tokenizer = None
[docs] def parse_string(self, str): """ Parse a string Result of the parsing is added to the current result model. Args: str: String to parse """ self.tokenizer = FznTokenizer(input=str) self._read_document() self.tokenizer = None
[docs] def write(self, out=None): """ Write the model. If the given output is a string, it is considered as a file name that is opened by this method using 'utf-8' encoding. Args: out (Optional): Target output stream or file name. If not given, default value is sys.stdout. """ # Check file if is_string(out): with open_utf8(os.path.abspath(out), mode='w') as f: self.write(f) return # Check default output if out is None: out = sys.stdout # Write model content for x in self.parameters: print(str(x)) for x in self.variables: print(str(x)) for x in self.constraints: print(str(x)) out.flush()
def _read_document(self): """ Read all FZN document """ try: self._next_token() while self._read_predicate(): pass while self._read_parameter_or_variable(): pass while self._read_constraint(): pass self._read_objective() except Exception as e: if isinstance(e, FznParserException): raise e if config.context.log_exceptions: traceback.print_exc() self._raise_exception(str(e)) if self.token is not TOKEN_EOF: self._raise_exception("Unexpected token '{}'".format(self.token)) def _read_predicate(self): """ Read a predicate declaration This function is called with first token already read and terminates with next token already read. Returns: True if a predicate has been read, False if nothing to process """ if self.token is not TOKEN_KEYWORD_PREDICATE: return False # Read predicate declaration while self.token not in (TOKEN_SEMICOLON, TOKEN_EOF): self._next_token() if self.token is not TOKEN_SEMICOLON: self._raise_exception("Semicolon ';' expected at the end of a predicate declaration.") self._next_token() return True def _read_parameter_or_variable(self): """ Read a parameter or variable declaration This function is called with first token already read and terminates with next token already read. Returns: True if a parameter has been read, False if nothing to process """ tok = self.token if tok.type is not TOKEN_TYPE_KEYWORD: return False # Read array size if any arsize = self._read_array_size() # Check if variable declaration tok = self.token if tok is TOKEN_KEYWORD_VAR: self._next_token() return self._read_variable(arsize) # Check type name if tok not in (TOKEN_KEYWORD_BOOL, TOKEN_KEYWORD_FLOAT, TOKEN_KEYWORD_INT, TOKEN_KEYWORD_SET): return False typ = tok if typ is TOKEN_KEYWORD_SET: self._check_token(self._next_token(), TOKEN_KEYWORD_OF) self._check_token(self._next_token(), TOKEN_KEYWORD_INT) # Check separating colon self._check_token(self._next_token(), TOKEN_COLON) # Check parameter name tok = self._next_token() if tok.type is not TOKEN_TYPE_SYMBOL: self._raise_exception("Symbol expected as parameter name.") pid = tok.value self._check_token(self._next_token(), TOKEN_ASSIGN) # Read expression self._next_token() expr = self._read_expression() if arsize: expr = list(expression._domain_iterator(expr)) if typ is TOKEN_KEYWORD_SET: arsize = len(expr) self._check_token(self.token, TOKEN_SEMICOLON) self._next_token() # Build result fp = FznParameter(pid, typ.value, arsize, expr) self.var_map[pid] = fp self.parameters.append(fp) return True def _read_variable(self, arsize): """ Read a variable declaration This function is called with first token already read and terminates with next token already read. Args: arsize: Array size if any Returns: True if a variable has been read, False if nothing to process """ # Read type and domain typ, dom = self._read_var_domain() self._check_token(self.token, TOKEN_COLON) tok = self._next_token() if tok.type is not TOKEN_TYPE_SYMBOL: self._raise_exception("Symbol expected as variable name.") vid = tok.value tok = self._next_token() # Check annotations annotations = self._read_annotations() # print("Annotations: {}".format(annotations)) # Check expression expr = None if self.token is TOKEN_ASSIGN: self._next_token() expr = self._read_expression() # Read ending semicolon self._check_token(self.token, TOKEN_SEMICOLON) self._next_token() # Create variable fv = FznVariable(vid, typ.value, dom, annotations, arsize, expr) self.var_map[vid] = fv self.variables.append(fv) return True def _read_constraint(self): """ Read a constraint This function is called with first token already read and terminates with next token already read. Returns: True if a variable has been read, False if nothing to process """ # Check constraint token if self.token is not TOKEN_KEYWORD_CONSTRAINT: return False # Read constraint name tok = self._next_token() if tok.type is not TOKEN_TYPE_SYMBOL: self._raise_exception("Constraint name '{}' should be a symbol.".format(tok)) cname = tok.value # Read parameters args = [] self._check_token(self._next_token(), TOKEN_PARENT_OPEN) self._next_token() while self.token is not TOKEN_PARENT_CLOSE: args.append(self._read_expression()) if self.token is TOKEN_COMMA: self._next_token() self._next_token() # Check annotations annotations = self._read_annotations() defvar = annotations.get('defines_var', (None,))[0] # Read ending semicolon self._check_token(self.token, TOKEN_SEMICOLON) self._next_token() # Store constraint self.constraints.append(FznConstraint(cname, args, annotations)) return True def _read_objective(self): """ Read solve objective This function is called with first token already read and terminates with next token already read. Returns: True if a variable has been read, False if nothing to process """ # Check constraint token if self.token is not TOKEN_KEYWORD_SOLVE: return False self._next_token() # Check annotations annotations = self._read_annotations() # Read solve objective tok = self.token if (tok not in (TOKEN_KEYWORD_SATISFY, TOKEN_KEYWORD_MINIMIZE, TOKEN_KEYWORD_MAXIMIZE)): self._raise_exception( "Solve objective '{}' should be a symbol in 'satisfy', 'minimize', 'maximize'.".format(tok)) obj = tok self._next_token() # Read expression if any expr = None if obj is TOKEN_KEYWORD_SATISFY else self._read_expression() # Read ending semicolon self._check_token(self.token, TOKEN_SEMICOLON) self._next_token() # Store objective self.objective = FznObjective(obj.value, expr, annotations) return True def _read_expression(self): """ Read an expression First expression token is already read. Function exits with current token following the last expression token Returns: Expression that has been read """ tok = self.token self._next_token() # Check int constant if tok.type is TOKEN_TYPE_INTEGER: v1 = int(tok.value) # Check set const if self.token is TOKEN_INTERVAL: tok2 = self._next_token() if tok2.type is not TOKEN_TYPE_INTEGER: self._raise_exception("Set upper bound {} should be an integer constant.".format(tok2)) self._next_token() v2 = int(tok2.value) return (v1,) if v1 == v2 else (v1, v2) else: return v1 # Check float constant if tok.type is TOKEN_TYPE_FLOAT: return float(tok.value) # Set of integer constant if tok is TOKEN_BRACE_OPEN: lints = [] tok = self.token while tok is not TOKEN_BRACE_CLOSE: if tok.type is not TOKEN_TYPE_INTEGER: self._raise_exception("Set element {} should be an integer constant.".format(tok)) lints.append(int(tok.value)) tok = self._next_token() if tok is TOKEN_COMMA: tok = self._next_token() self._next_token() return lints # Check symbols if tok.type is TOKEN_TYPE_SYMBOL: sid = tok.value # Check array access if self.token is TOKEN_HOOK_OPEN: # Access corresponding FZN element elem = self.var_map.get(sid) if elem is None: self._raise_exception("Unknown symbol '{}'.".format(sid)) tok2 = self._next_token() if tok2.type is not TOKEN_TYPE_INTEGER: self._raise_exception("Array index '{}' should be an integer constant.".format(tok2)) self._check_token(self._next_token(), TOKEN_HOOK_CLOSE) self._next_token() # Build array access as a tuple (arr_name, index) return (elem, int(tok2.value)) # Check annotation function call elif self.token is TOKEN_PARENT_OPEN: lexprs = [sid] self._next_token() while self.token is not TOKEN_PARENT_CLOSE: lexprs.append(self._read_expression()) if self.token is TOKEN_COMMA: self._next_token() self._next_token() return tuple(lexprs) else: # Check if corresponds to FZN element return self.var_map.get(sid, sid) # Array of expressions if tok is TOKEN_HOOK_OPEN: lexprs = [] while self.token is not TOKEN_HOOK_CLOSE: lexprs.append(self._read_expression()) if self.token is TOKEN_COMMA: self._next_token() self._next_token() return lexprs # Check boolean constant if tok is TOKEN_KEYWORD_TRUE: return True if tok is TOKEN_KEYWORD_FALSE: return False # Unknown self._raise_exception("Invalid expression start: '{}'.".format(tok)) def _read_array_size(self): """ Read an array size declaration First expression token is already read. Function exits with current token following the last expression token Returns: Array size as int if given, -1 if size is not precised, None if no array specified """ # Check array token if self.token is not TOKEN_KEYWORD_ARRAY: return None # Read array specs self._check_token(self._next_token(), TOKEN_HOOK_OPEN) tok = self._next_token() if tok is TOKEN_KEYWORD_INT: arsize = -1 else: if (tok.type is not TOKEN_TYPE_INTEGER) and tok.value != '1' : self._raise_exception("Array size should start by '1'") self._check_token(self._next_token(), TOKEN_INTERVAL) tok = self._next_token() if tok.type is not TOKEN_TYPE_INTEGER: self._raise_exception("Array size '{}' should be integer.".format(tok)) arsize = int(tok.value) self._check_token(self._next_token(), TOKEN_HOOK_CLOSE) self._check_token(self._next_token(), TOKEN_KEYWORD_OF) self._next_token() return arsize def _read_var_domain(self): """ Read the domain of a variable. First expression token is already read. Function exits with current token following the last expression token Returns: Type token and variable domain """ # Get token tok = self.token typ = tok # Check boolean domain if typ is TOKEN_KEYWORD_BOOL: self._next_token() return typ, BINARY_DOMAIN # Check undefined domain if typ is TOKEN_KEYWORD_INT: self._next_token() return typ, DEFAULT_INTEGER_VARIABLE_DOMAIN # Read set of integers or interval if tok is TOKEN_BRACE_OPEN: lint = sorted(self._read_expression()) dom = [] llen = len(lint) i = 0 while i < llen: j = i + 1 while (j < llen) and (lint[j] == lint[j - 1] + 1): j += 1 if (j > i + 1): dom.append((lint[i], lint[j - 1])) else: dom.append(lint[i]) i = j return TOKEN_KEYWORD_INT, tuple(dom) # Check integer domain if tok.type is not TOKEN_TYPE_INTEGER: self._raise_exception("Variable domain should start by an integer constant.") self._next_token() if self.token is TOKEN_INTERVAL: tok2 = self._next_token() if tok2.type is not TOKEN_TYPE_INTEGER: self._raise_exception("Domain upper bound {} should be an integer constant.".format(tok2)) self._next_token() v1 = int(tok.value) v2 = int(tok2.value) if v1 == v2: return TOKEN_KEYWORD_INT, (v1,) return TOKEN_KEYWORD_INT, ((v1, v2),) else: return TOKEN_KEYWORD_INT, (int(tok.value),) def _read_annotations(self): """ Read a list of annotations First expression token is already read. Function exits with current token following the last expression token Returns: Dictionary of annotations. Key is name, value is tuple or parameters. """ result = {} # Check annotation start token while self.token is TOKEN_DOUBLECOLON: # Read annotation name anm = self._next_token() if anm.type is not TOKEN_TYPE_SYMBOL: self._raise_exception("Annotation name '{}' should be a symbol.".format(anm)) args = [] tok = self._next_token() if tok is TOKEN_PARENT_OPEN: self._next_token() while self.token is not TOKEN_PARENT_CLOSE: args.append(self._read_expression()) if self.token is TOKEN_COMMA: self._next_token() self._next_token() result[anm.value] = tuple(args) return result def _next_token(self): """ Read next token Returns: Next read token, None if end of input """ self.token = self.tokenizer.next_token() # print("Line {}, col {}, tok '{}'".format(self.tokenizer.line_number, self.tokenizer.read_index, self.token)) return self.token def _check_token(self, tok, etok): """ Check that a read token is a given one an raise an exception if not Args: tok: Read token etok: Expected token """ if tok is not etok: self._raise_exception("Read token '{}' instead of expected '{}'".format(tok, etok)) def _raise_exception(self, msg): """ Raise a Parsing exception Args: msg: Exception message """ raise FznParserException(self.tokenizer.build_error_string(msg))
[docs]class FznParser(object): """ Reader of FZN file format """ __slots__ = ('model', # Read model 'compiled', # Model compiled indicator 'reader', # FZN reader 'cpo_exprs', # Dictionary of CPO expressions. Key=name, value=CPO expr 'reduce', # Reduce model indicator 'interval_gen', # Name generator for interval var expressions 'cumul_gen', # Name generator for cumul atom expressions 'parameters', # List of parameters 'variables', # List of variables 'constraints', # List of model constraints 'objective', # Model objective 'cur_constraint', # Currently compiled constraint descriptor 'def_var_exprs', # List of expressions waiting for defvars to be defined 'cpo_variables', # Set of names of variables that are translated as real CPO variables ) def __init__(self, mdl=None): """ Create a new FZN format parser Args: mdl: Model to fill, None (default) to create a new one. """ super(FznParser, self).__init__() self.model = mdl if mdl is not None else CpoModel() self.compiled = False self.reader = FznReader() self.interval_gen = IdAllocator("IntervalVar_") self.cumul_gen = IdAllocator("VarCumulAtom_") # Do not store location information (would store parser instead of real lines) self.model.source_loc = False # Set model reduction indicator self.reduce = config.context.parser.fzn_reduce
[docs] def get_model(self): """ Get the model that have been parsed Return: CpoModel result of the parsing """ if not self.compiled: self.compiled = True self._compile_to_model() return self.model
[docs] def parse(self, cfile): """ Parse a FZN file Args: cfile: FZN file to read Raises: FznParserException: Parsing exception """ if self.model.source_file is None: self.model.source_file = cfile self.reader.parse(cfile)
[docs] def parse_string(self, str): """ Parse a string Result of the parsing is added to the current result model. Args: str: String to parse """ self.reader.parse_string(str)
[docs] def get_output_variables(self): """ Get the list of model output variables Returns: List of output variables, in declaration order. """ return [v for v in self.variables if v.is_output()]
def _write_model(self, out=None): """ Print read model (short version) Args: out (optional): Output stream. Default is stdout """ if out is None: out = sys.stdout out.write(self.get_model().get_cpo_string(short_output=True)) out.write("\n") def _get_cpo_expr_map(self): """ For testing, get the map of CPO expressions """ self.get_model() return self.cpo_exprs def _compile_to_model(self): """ Compile FZN model into CPO model """ # Initialize processing self.cpo_exprs = {} self.parameters = self.reader.parameters self.variables = self.reader.variables self.constraints = self.reader.constraints self.objective = self.reader.objective self.def_var_exprs = {} self.cpo_variables = set() # Reduce model if required if self.reduce: self._reduce_model() # print("=== Variables:") # for v in self.variables: # print(" : {}".format(v)) # print("=== Constraints:") # for c in self.constraints: # print(" : {}".format(c)) # sys.stdout.flush() # Compile parameters for x in self.parameters: self._compile_parameter(x) # Compile variables for x in self.variables: self._compile_variable(x) # Compile constraints if self.reduce: for x in self.constraints: if isinstance(x, FznVariable): self._compile_variable(x) else: self._compile_constraint(x) else: for x in self.constraints: self._compile_constraint(x) # Compile objective self._compile_objective(self.objective) def _compile_parameter(self, fp): """ Compile a FZN parameter into CPO model Args: fp: Flatzinc parameter, object of class FznParameter """ if fp.type in ('int', 'bool'): expr = CpoValue(fp.value, Type_IntArray if fp.size else Type_Int) elif fp.type == 'float': expr = CpoValue(fp.value, Type_FloatArray if fp.size else Type_Float) else: expr = build_cpo_expr(fp.value) # Add to map expr.set_name(fp.name) self.cpo_exprs[fp.name] = expr def _compile_variable(self, fv): """ Compile a FZN variable into CPO model Args: fv: Flatzinc variable """ # Check if variable is array val = fv.value if fv.size: # Build array of variables if val: # Check if there is a reference to a not yet defined variable if self.reduce: for v in val: if v in self.def_var_exprs: self.def_var_exprs[v].append(fv) return arr = [self._get_cpo_expr(e) for e in val] expr = CpoValue(arr, Type_IntVarArray if all(x.type == Type_IntVar for x in arr) else Type_IntExprArray) else: # Build array of variables arr = [integer_var(name=fv.name + '[' + str(i + 1) + ']', domain=fv.domain) for i in range(fv.size)] expr = CpoValue(arr, Type_IntVarArray) else: # Build single variable if self.reduce and val: if is_int(val): expr = CpoValue(val, Type_Int) elif is_bool(val): expr = CpoValue(val, Type_Bool) else: expr = self._get_cpo_expr(val) else: # Check if value is another variable if isinstance(val, FznVariable): # Retrieve existing variable expr = self.cpo_exprs.get(val.name) assert isinstance(expr, CpoIntVar), "Variable '{}' not found".format(val.name) else: # Create new variable dom = _build_domain(val) if val else fv.domain expr = integer_var(domain=dom) expr.set_name(fv.name) self.cpo_exprs[fv.name] = expr def _compile_constraint(self, fc): """ Compile a FZN constraint into CPO model Args: fv: Flatzinc constraint """ # Search in local methods cmeth = getattr(self, "_compile_pred_" + fc.predicate, None) if not cmeth: raise FznParserException("Predicate '{}' is not supported.".format(fc.predicate)) # Call compile method cmeth(fc) def _compile_objective(self, fo): """ Compile a FZN objective into CPO model Args: fo: Flatzinc objective """ #print("Compile objective {}".format(fo)) if fo is None: return if fo.operation != 'satisfy': expr = self._get_cpo_expr(fo.expr) oxpr = modeler.maximize(expr) if fo.operation == 'maximize' else modeler.minimize(expr) self._add_to_model(oxpr) def _reduce_model(self): """ Reduce model size by factorizing expressions when possible """ # Access main model elements variables = self.variables constraints = self.constraints # Build reduction data related to variables for fv in variables: # Build list of variables that are referenced by this one fv.ref_vars = tuple(v for v in fv.value if isinstance(v, FznVariable)) if fv.size else () # Set in defined variables if output if fv.is_output(): self.cpo_variables.add(fv.name) # In constraints, replace reference to arrays by arrays themselves for fc in constraints: fc.args = tuple(a.value if isinstance(a, FznVariable) and a.size else a for a in fc.args) # Initialize set of variables defined in constraints def_var_map = {} # Key is variable, value is constraint where variable is defined for fc in constraints: # print("Scan constraint {}".format(fc)) defvar = fc.defvar if defvar is not None: def_var_map[defvar] = fc # Build list of all variables referenced by this constraint res = set() nbrefdvar = 0 for v in fc._ref_vars_iterator(): if v is defvar: nbrefdvar += 1 else: res.add(v) fc.ref_vars = tuple(res) # Special case for cumulative. All variables are supposed defined by the constraint. if fc.predicate == 'cumulative': for v in fc.ref_vars: def_var_map[v] = fc # print(" result list of ref variables: {}".format(fc.ref_vars)) # If defined variable is referenced twice in the constraint, remove it as defined (keep as declared variable) if nbrefdvar > 1: fc.defvar = None self.cpo_variables.add(defvar.name) # Determine connected variable subsets #self._determine_connex_variables(def_var_map) # Scan variables to move them after definition of their dependencies when needed variables = [] # New list of variables for fv in self.variables: #print("Scan variable {}, Refvars: {}".format(fv, [v.name for v in fv.ref_vars])) if any(v in def_var_map for v in fv.ref_vars): if self._insert_in_constraints(fv, 0, def_var_map): # Remove from list of variables (moved in constraints) pass else: # Keep it as a model variable self.cpo_variables.add(fv.name) variables.append(fv) else: variables.append(fv) self.variables = variables # Reorder constraints #print("\nScan constraints. Defined vars: {}".format([v.name for v in defined_vars])) constraints = self.constraints nbct = len(constraints) movedcstr = set() # Constraints already moved cx = 0 while cx < nbct: fc = constraints[cx] #print("Scan constraint {}. Refvars: {}".format(fc, [v.name for v in fc.ref_vars])) #print("Defined vars: {}".format([v.name for v in defined_vars])) # Process case of variable that has been inserted in constraints if isinstance(fc, FznVariable): def_var_map.pop(fc, None) else: # Search if constraint can be moved if (fc not in movedcstr) and (fc.predicate != "cumulative") \ and any(v in def_var_map for v in fc.ref_vars) \ and self._insert_in_constraints(fc, cx + 1, def_var_map): # Move constraint after all is defined del constraints[cx] movedcstr.add(fc) cx -= 1 else: # Constraint stays where it is if fc.defvar: def_var_map.pop(fc.defvar, None) cx += 1 # print("Reduction ended.") # print(" Variables:") # for v in self.variables: # print(" {}".format(v)) # print(" Constraints:") # for c in self.constraints: # print(" {}".format(c)) def _insert_in_constraints(self, fc, cx, varsctsr): """ Insert a constraint or a variable in constraints after all its members are defined Args: fc: FZN constraint or variable to insert cx: Start insertion index varsctsr: Map of constraints where each variable is defined Return: True if insertion was successful, False otherwise """ # Build set of constraints to skip to wait for variable definition cset = set(varsctsr[v] for v in fc.ref_vars if v in varsctsr) # Check no dependency if not cset: return False # Search in next constraints constraints = self.constraints for ix in xrange(cx, len(constraints)): cset.discard(constraints[ix]) if not cset: # Insert after this place constraints.insert(ix + 1, fc) # print(" inserted at best rank {}".format(cx + 1)) return True # Impossible to insert # print(" impossible to insert") return False def _determine_connex_variables(self, def_var_map): """ Search connex sub-graphes in variables dependencies and put all multi-variables connex parts as cpo variables Args: def_var_map: Map of constraints where variable is defined """ # Create result map. Key is variable descriptor, value is set of connected variable descriptors lcomps = {} # For each variable, build the list of depending variables for fv in self.variables: stack = [fv] dset = set(stack) while stack: sv = stack.pop(0) for v in sv.ref_vars: if v not in dset: if v in lcomps: dset.update(lcomps[v]) else: dset.add(v) stack.append(v) fc = def_var_map.get(fv) if fc: for v in fc.ref_vars: if v not in dset: #print(" cstr.ref: {}".format(v)) if v in lcomps: dset.update(lcomps[v]) else: dset.add(v) stack.append(v) lcomps[fv] = dset # Sort all variables list in ascending cardinality order #print("Variables graphs:") #for k, v in lcomps.items(): # print(" {}: {}".format(k.name, [x.name for x in v])) lcomps = sorted(lcomps.values(), key=lambda x: len(x)) # Remove each set in next ones nbcomps = len(lcomps) for x1 in range(nbcomps): s1 = lcomps[x1] if s1: for x2 in range(x1 + 1, nbcomps): lcomps[x2] = lcomps[x2].difference(s1) # Set as model variables all that are in component with at least two variables for c in lcomps: #print(" graph component: {}".format([x.name for x in c])) if len(c) > 1: for v in c: self.cpo_variables.add(v.name) #print("CPO model variables identified: {}".format([v for v in self.cpo_variables])) def _get_cpo_expr(self, expr): """ Retrieve a CPO expression from its FZN representation Args: expr: FZN expression Returns: Corresponding CPO expression """ #print(" _get_cpo_expr({}, type={})".format(expr, type(expr))) # Check basic types etyp = type(expr) if etyp in (FznVariable, FznParameter): v = self.cpo_exprs.get(expr.name) if v is None: raise FznParserException("Can not find element {}".format(expr.name)) return v # Integer constant if etyp in INTEGER_TYPES: return CpoValue(expr, Type_Int) # Array if etyp is list: return build_cpo_expr([self._get_cpo_expr(x) for x in expr]) # Check array access if etyp is tuple: # Check tuple of integers if is_int(expr[0]): if len(expr) > 1: return [i for i in range(expr[0], expr[1] + 1)] return [expr[0]] # Access to array element arr = self.cpo_exprs.get(expr[0].name) if arr is None: raise FznParserException("Can not find array {}".format(expr[0])) return(arr.value[expr[1]-1]) # Boolean if etyp in BOOL_TYPES: return CpoValue(1, Type_Bool) if expr else CpoValue(0, Type_Bool) # String if etyp in STRING_TYPES: v = self.cpo_exprs.get(expr) if v is None: raise FznParserException("Can not find element {}".format(expr)) return v # Unknown raise FznParserException("Can not find element {}".format(expr)) # Array predicates def _compile_pred_array_bool_element(self, fc): self._compile_array_xxx_element(fc) def _compile_pred_array_int_element(self, fc): self._compile_array_xxx_element(fc) def _compile_pred_array_float_element(self, fc): self._compile_array_xxx_element(fc) def _compile_pred_array_var_bool_element(self, fc): self._compile_array_xxx_element(fc) def _compile_pred_array_var_int_element(self, fc): self._compile_array_xxx_element(fc) def _compile_pred_array_var_float_element(self, fc): self._compile_array_xxx_element(fc) def _compile_pred_array_bool_and(self, fc): self._compile_op_assign_arg_1(fc, modeler.min_of) def _compile_pred_array_bool_or(self, fc): self._compile_op_assign_arg_1(fc, modeler.max_of) # Bool predicates def _compile_pred_bool_and(self, fc): self._compile_op_assign_arg_2(fc, modeler.logical_and) def _compile_pred_bool_or(self, fc): self._compile_op_assign_arg_2(fc, modeler.logical_or) def _compile_pred_bool_xor(self, fc): self._compile_op_assign_arg_2(fc, modeler.diff) def _compile_pred_bool_not(self, fc): self._compile_op_assign_arg_1(fc, modeler.logical_not) def _compile_pred_bool_eq(self, fc): self._compile_xxx_eq(fc) def _compile_pred_bool_eq_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.equal) def _compile_pred_bool_le(self, fc): self._compile_op_arg_2(fc, modeler.less_or_equal) def _compile_pred_bool_le_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.less_or_equal) def _compile_pred_bool_lin_eq(self, fc): self._compile_scal_prod(fc, modeler.equal) def _compile_pred_bool_lin_le(self, fc): self._compile_scal_prod(fc, modeler.less_or_equal) def _compile_pred_bool_lt(self, fc): self._compile_op_arg_2(fc, modeler.less) def _compile_pred_bool_lt_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.less) def _compile_pred_bool2int(self, fc): self._compile_xxx_eq(fc) # Float predicates def _compile_pred_float_abs(self, fc): self._compile_op_assign_arg_1(fc, modeler.abs_of) def _compile_pred_float_exp(self, fc): self._compile_op_assign_arg_1(fc, modeler.exponent) def _compile_pred_float_ln(self, fc): self._compile_op_assign_arg_1(fc, modeler.log) def _compile_pred_float_log10(self, fc): a, r = fc.args self._make_equal(r, modeler.log(self._get_cpo_expr(a)) / modeler.log(10), fc.defvar) def _compile_pred_float_log2(self, fc): a, r = fc.args self._make_equal(r, modeler.log(self._get_cpo_expr(a)) / modeler.log(2), fc.defvar) def _compile_pred_float_sqrt(self, fc): a, r = fc.args self._make_equal(r, modeler.power(self._get_cpo_expr(a), 0.5), fc.defvar) def _compile_pred_float_eq(self, fc): self._compile_xxx_eq(fc) def _compile_pred_float_eq_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.equal) def _compile_pred_float_le(self, fc): self._compile_op_arg_2(fc, modeler.less_or_equal) def _compile_pred_float_le_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.less_or_equal) def _compile_pred_float_lin_eq(self, fc): self._compile_scal_prod(fc, modeler.equal) def _compile_pred_float_lin_le(self, fc): self._compile_scal_prod(fc, modeler.less_or_equal) def _compile_pred_float_lin_lt(self, fc): self._compile_scal_prod(fc, modeler.lesst) def _compile_pred_float_lin_ne(self, fc): self._compile_scal_prod(fc, modeler.diff) def _compile_pred_float_lin_eq_reif(self, fc): self._compile_scal_prod(fc, modeler.equal, True) def _compile_pred_float_lin_le_reif(self, fc): self._compile_scal_prod(fc, modeler.less_or_equal, True) def _compile_pred_float_lin_lt_reif(self, fc): self._compile_scal_prod(fc, modeler.less, True) def _compile_pred_float_lin_ne_reif(self, fc): self._compile_scal_prod(fc, modeler.diff, True) def _compile_pred_float_lt(self, fc): self._compile_op_arg_2(fc, modeler.less) def _compile_pred_float_lt_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.less) def _compile_pred_float_max(self, fc): self._compile_op_assign_arg_2(fc, modeler.max_of) def _compile_pred_float_min(self, fc): self._compile_op_assign_arg_2(fc, modeler.min_of) def _compile_pred_float_ne(self, fc): self._compile_op_arg_2(fc, modeler.diff) def _compile_pred_float_ne_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.diff) def _compile_pred_float_plus(self, fc): self._compile_op_assign_arg_2(fc, modeler.plus) # Int predicates def _compile_pred_int_abs(self, fc): self._compile_op_assign_arg_1(fc, modeler.abs_of) def _compile_pred_int_div(self, fc): self._compile_op_assign_arg_2(fc, modeler.int_div) def _compile_pred_int_eq(self, fc): self._compile_xxx_eq(fc) def _compile_pred_int_eq_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.equal) def _compile_pred_int_le(self, fc): self._compile_op_arg_2(fc, modeler.less_or_equal) def _compile_pred_int_le_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.less_or_equal) def _compile_pred_int_lin_eq(self, fc): self._compile_scal_prod(fc, modeler.equal) def _compile_pred_int_lin_le(self, fc): self._compile_scal_prod(fc, modeler.less_or_equal) def _compile_pred_int_lin_ne(self, fc): self._compile_scal_prod(fc, modeler.diff) def _compile_pred_int_lin_eq_reif(self, fc): self._compile_scal_prod(fc, modeler.equal, True) def _compile_pred_int_lin_le_reif(self, fc): self._compile_scal_prod(fc, modeler.less_or_equal, True) def _compile_pred_int_lin_ne_reif(self, fc): self._compile_scal_prod(fc, modeler.diff, True) def _compile_pred_int_lt(self, fc): self._compile_op_arg_2(fc, modeler.less) def _compile_pred_int_lt_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.less) def _compile_pred_int_max(self, fc): self._compile_op_assign_arg_2(fc, modeler.max_of) def _compile_pred_int_min(self, fc): self._compile_op_assign_arg_2(fc, modeler.min_of) def _compile_pred_int_mod(self, fc): self._compile_op_assign_arg_2(fc, modeler.mod) def _compile_pred_int_ne(self, fc): self._compile_op_arg_2(fc, modeler.diff) def _compile_pred_int_ne_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.diff) def _compile_pred_int_plus(self, fc): self._compile_op_assign_arg_2(fc, modeler.plus) def _compile_pred_int_times(self, fc): self._compile_op_assign_arg_2(fc, modeler.times) def _compile_pred_int2float(self, fc): self._compile_xxx_eq(fc) # Set predicates def _compile_pred_set_in(self, fc): self._compile_op_arg_2(fc, modeler.allowed_assignments) def _compile_pred_set_in_reif(self, fc): self._compile_op_assign_arg_2(fc, modeler.allowed_assignments) # Custom predicates def _compile_pred_all_different_int(self, fc): self._compile_op_arg_1(fc, modeler.all_diff) def _compile_pred_count_eq_const(self, fc): self._compile_op_assign_arg_2(fc, modeler.count) def _compile_pred_lex_lesseq_int(self, fc): self._compile_op_arg_2(fc, modeler.lexicographic) def _compile_pred_lex_lesseq_bool(self, fc): self._compile_op_arg_2(fc, modeler.lexicographic) def _compile_pred_int_pow(self, fc): self._compile_op_assign_arg_2(fc, modeler.power) def _compile_pred_cumulative(self, fc): """ Requires that a set of tasks given by start times s, durations d, and resource requirements r, never require more than a global resource bound b at any one time. Args: fc: Constraint descriptor, with arguments stime: Tasks start time tdur: Tasks tasks durations rreq: Task resource requirements bnd: Global resource bound """ #print("Process cumulative constraint {}".format(fc)) # Access constraint arguments stime, tdur, rreq, bnd = fc.args # Create interval vars and cumul atoms ls = _get_fzn_array(stime) ld = _get_fzn_array(tdur) lr = _get_fzn_array(rreq) #print("ls: {}\nld: {}\nlr: {}".format(ls, [s for s in ld], lr)) cumul_atoms = [] for s, d, r in zip(ls, ld, lr): vname = None # Get start time if isinstance(s, FznVariable): ds = s._get_domain_bounds() vname = s.name else: ds = (s, s) # Get duration if isinstance(d, FznVariable): dd = d._get_domain_bounds() if vname is None: vname = d.name else: dd = (d, d) # Get requirement if isinstance(r, FznVariable): dr = r._get_domain_bounds() if vname is None: vname = r.name else: dr = (r, r) #print("ds: {}, dd: {}, dr: {}".format(ds, dd, dr)) # Create interval variable if vname is None: vname = self.interval_gen.allocate() else: vname = "Itv_" + vname if vname in self.cpo_exprs: cnt = 1 nname = vname + "_1" while nname in self.cpo_exprs: cnt += 1 nname = vname + "_" + str(cnt) vname = nname # Create interval variable ivar = self.cpo_exprs.get(vname) if ivar: # Check it is the same assert isinstance(ivar, CpoIntervalVar) and ivar.get_start() == ds and ivar.get_size() == dd and ivar.get_end() == (INTERVAL_MIN, INTERVAL_MAX) else: ivar = interval_var(start=ds, end=(INTERVAL_MIN, INTERVAL_MAX), size=dd, name=vname) self.cpo_exprs[vname] = ivar # Create pulse pulse = modeler.pulse(ivar, dr) cumul_atoms.append(pulse) # Replace previous variable by access to interval variable if isinstance(s, FznVariable): self._assign_to_var(s, modeler.start_of(ivar)) if isinstance(d, FznVariable): self._assign_to_var(d, modeler.size_of(ivar)) if isinstance(r, FznVariable): self._assign_to_var(r, modeler.height_at_start(ivar, pulse)) # Create final constraint cumf = CpoFunctionCall(Oper_sum, Type_CumulFunction, (CpoValue(cumul_atoms, Type_CumulAtomArray),)) self._add_to_model(modeler.greater_or_equal(self._get_cpo_expr(bnd), cumf)) def _compile_pred_subcircuit(self, fc): """ Constrains the elements of x to define a subcircuit where x[i] = j means that j is the successor of i and x[i] = i means that i is not in the circuit. """ x = [0] + list(self._get_cpo_expr(fc.args[0]).value) self._add_to_model(CpoFunctionCall(Oper_sub_circuit, Type_Constraint, (build_cpo_expr(x),) )) def _compile_pred_inverse(self, fc): """ Constrains two arrays of int variables, f and invf, to represent inverse functions. All the values in each array must be within the index set of the other array. Args: f: First function as array of int invf: Inverse function """ f, invf = fc.args f = [0] + list(self._get_cpo_expr(f).value) invf = [0] + list(self._get_cpo_expr(invf).value) self._add_to_model(CpoFunctionCall(Oper_inverse, Type_Constraint, (build_cpo_expr(f), build_cpo_expr(invf)))) def _compile_pred_bool_clause(self, fc): """ Implementation of bool_clause predicate Args: a: First array of booleans b: Second array of booleans """ a, b = fc.args # Default implementation exprs = list(self._get_cpo_expr(a).value) for x in self._get_cpo_expr(b).value: exprs.append(1 - x) self._add_to_model(modeler.sum_of(exprs) >= 1) # Alternative implementation # self._add_to_model( (modeler.max_of(a) > 0) | (modeler.min_of(b) == 0) ) # Other alternative implementation # expr = None # for x in _get_value(a): # x = x > 0 # expr = x if expr is None else modeler.logical_or(expr, x) # for x in _get_value(b): # x = x == 0 # expr = x if expr is None else modeler.logical_or(expr, x) # self._add_to_model(expr) def _compile_pred_table_int(self, fc): """ Implement custom predicate table_int Args: vars: Array of variables values: List of values """ vars, values = fc.args # Split value array in tuples vars = self._get_cpo_expr(vars).value tsize = len(vars) if tsize != 0: values = self._get_cpo_expr(values).value tuples = [values[i: i + tsize] for i in range(0, len(values), tsize)] # Build allowed assignment expression self._add_to_model(modeler.allowed_assignments(vars, tuples)) def _compile_pred_lex_less_bool(self, fc): """ Requires that the array vars1 is strictly lexicographically less than array vars2 Args: vars1: First array of variables vars2: Second array of variables """ vars1, vars2 = fc.args # Add 0 and 1 at the end of arrays to force inequality vars1 = list(self._get_cpo_expr(vars1).value) + [1] vars2 = list(self._get_cpo_expr(vars2).value) + [0] self._add_to_model(modeler.lexicographic(vars1, vars2)) def _compile_pred_lex_less_int(self, fc): self._compile_pred_lex_less_bool(fc) def _get_domain_bounds(self, x): """ Get min and max bounds of an expression, integer variable or integer Args: expr: CPO integer variable or expression Returns: Tuple (min, max) """ # Case of variable or variable replaced by an expression if isinstance(x, CpoIntVar): return (x.get_domain_min(), x.get_domain_max()) if isinstance(x, CpoFunctionCall): if x.operation is Oper_start_of: return x.children[0].get_start() if x.operation is Oper_size_of: return x.children[0].get_size() # if x.is_kind_of(Type_BoolExpr): # return (0, 1) cpov = self.cpo_exprs.get(x.name) if isinstance(cpov, CpoIntVar): return (cpov.get_domain_min(), cpov.get_domain_max()) raise FznParserException("Unknow expression to take bounds from: {}".format(x)) if isinstance(x, CpoValue): x = x.value if is_int(x): return (x, x) return x def _assign_to_var(self, var, expr): """ Set a identifier with an expression Args: var: Target FZN variable expr: CPO expression to assign Returns: None. If needed, changes are done in reader context. """ #print("_assign_to_var {} expression {}".format(var, expr)) # Retrieve existing expression vname = var.name vexpr = self.cpo_exprs.get(vname) # Check if reduction if (not self.reduce) or (vname in self.cpo_variables) or (vexpr is not None and not isinstance(vexpr, CpoIntVar)): self._add_to_model(modeler.equal(vexpr, expr)) else: # Assign new name to expression self.cpo_exprs[vname] = expr expr.set_name(vname) # Constrain expression to variable domain self._constrain_expr_domain(expr, var) def _make_equal(self, var, expr, dvar): """ Make equal two FZN expressions Args: var: FZN variable or value expr: CPO expression to be equal with dvar: Define variable, None if none Returns: None. If needed, changes are done in reader context. """ # Check if var is not a variable if not isinstance(var, FznVariable): self._add_to_model(modeler.equal(expr, self._get_cpo_expr(var))) return # Check no reduction if not self.reduce or (dvar is not var): self._add_to_model(modeler.equal(self._get_cpo_expr(var), expr)) return # Assign to variable self._assign_to_var(var, expr) def _constrain_expr_domain(self, expr, var): """ Constrain the domain of an expression to the domain of a variable Args: expr: CPO expression to constrain var: CPO or FZN integer var to take domain from """ dom = var.domain #print(" constrain expression {} to domain {}".format(expr, dom)) dmin = expression.get_domain_min(dom) dmax = expression.get_domain_max(dom) # Check boolean expression if dmin == 0 and dmax == 1: return if expr.is_kind_of(Type_BoolExpr) and (dmin <= 0) and (dmax >= 1): return # Add appropriate constraint if len(dom) == 1: # Single segment # Use range if dmin == dmax: self._add_to_model(modeler.equal(expr, dmin)) else: self._add_to_model(modeler.range(expr, dmin, dmax)) else: # Use allowed assignment self._add_to_model(modeler.allowed_assignments(expr, expression._domain_iterator(dom))) def _compile_op_assign_arg_1(self, fc, op): """ Compile operation with single argument equal to a result Args: fc: Constraint descriptor op: CPO operation to apply to first argument """ # Access constraint arguments a, r = fc.args # Assign to expression self._make_equal(r, op(self._get_cpo_expr(a)), fc.defvar) def _compile_op_assign_arg_2(self, fc, op): """ Compile operation with two arguments equal to a result Args: fc: Constraint descriptor op: CPO operation to apply to arguments """ # Access constraint arguments a, b, r = fc.args # Assign to expression self._make_equal(r, op(self._get_cpo_expr(a), self._get_cpo_expr(b)), fc.defvar) def _compile_op_arg_1(self, fc, op): """ Compile operation with one arguments and no result Args: fc: Constraint descriptor op: CPO operation to apply to argument """ self._add_to_model(op(self._get_cpo_expr(fc.args[0]))) def _compile_op_arg_2(self, fc, op): """ Compile operation with two arguments and no result Args: fc: Constraint descriptor op: CPO operation to apply to arguments """ a, b = fc.args self._add_to_model(op(self._get_cpo_expr(a), self._get_cpo_expr(b))) def _compile_array_xxx_element(self, fc): """ Compile access to array element """ x, t, r = fc.args if not is_int(x): x = self._get_cpo_expr(x) self._make_equal(r, modeler.element(self._get_cpo_expr(t), x - 1), fc.defvar) def _compile_xxx_eq(self, fc): """ Compile all equality predicates """ # Access constraint arguments a, b = fc.args # Check default case if not self.reduce: self._add_to_model(modeler.equal(self._get_cpo_expr(a), self._get_cpo_expr(b))) return # Process trivial cases if a is b: return None # Retrieve defined variable defvar = fc.defvar if defvar is None: self._add_to_model(modeler.equal(self._get_cpo_expr(a), self._get_cpo_expr(b))) return if defvar is a: self._assign_to_var(a, self._get_cpo_expr(b)) else: self._assign_to_var(b, self._get_cpo_expr(a)) def _compile_scal_prod(self, fc, op, reif=False): """ Compile a scalar product Args: fc: Constraint op: Comparison operation """ # Access constraint arguments if reif: coefs, vars, res, reif = fc.args else: coefs, vars, res = fc.args # Check no reduction defvar = fc.defvar if not self.reduce or not defvar: expr = op(modeler.scal_prod(self._get_cpo_expr(coefs), self._get_cpo_expr(vars)), self._get_cpo_expr(res)) if reif: expr = modeler.equal(self._get_cpo_expr(reif), expr) self._add_to_model(expr) return # Get array elements coefs = _get_fzn_array(coefs) vars = _get_fzn_array(vars) # Check if defined variable is the result if defvar is res or defvar is reif: expr = self._build_scal_prod_expr(coefs, vars, 0) else: # Arrange expression to have defined variable on the left vx = vars.index(defvar) vcoef = coefs[vx] vars = vars[:vx] + vars[vx + 1:] coefs = coefs[:vx] + coefs[vx + 1:] expr = res if is_int(res) else self._get_cpo_expr(res) if vcoef < 0: vcoef = -vcoef expr = -expr else: coefs = list([-c for c in coefs]) # Build result expr = self._build_scal_prod_expr(coefs, vars, expr) if vcoef != 1: expr = expr / vcoef # Check reif if reif: expr = op(expr, self._get_cpo_expr(res)) self._assign_to_var(defvar, expr) else: # Check equality with a variable if op is modeler.equal: self._assign_to_var(defvar, expr) else: self._add_to_model(op(expr, self._get_cpo_expr(defvar))) def _build_scal_prod_expr(self, coefs, vars, res): """ Build a scal prod expression Args: coefs: Array of coefficients (integers) vars: Array of FZN variables res: Initial result value (integer) Returns: New CPO scal_prod expression """ # Build array of CPO variables vars = [self._get_cpo_expr(v) for v in vars] # Check developed scal_prod if len(coefs) <= 2 or (all(c == 1 or c == -1 for c in coefs)): for c, v in zip(coefs, vars): if c != 0: if is_int_value(res, 0): res = _mutl_by_int(v, c) elif c < 0: res = res - _mutl_by_int(v, -c) else: res = res + _mutl_by_int(v, c) return res # Build normal scal_prod expr = modeler.scal_prod(coefs, vars) if not is_int_value(res, 0): expr = res + expr return expr def _add_to_model(self, expr): """ Add an expression to the CPO model Args: expr: CPO expression to add """ #print("_add_to_model({})".format(expr)) self.model.add(expr) # Scan expression to identify used variables estack = [expr] doneset = set() # Set of expressions already processed while estack: e = estack.pop() eid = id(e) if not eid in doneset: doneset.add(eid) if e.type.is_variable: #print(" add CPO variable {}".format(e)) self.cpo_variables.add(e.name) # Stack children expressions estack.extend(e.children) def _write(self, out=None): """ Write current parser status Args: out (optional): Write output. sys.stdout if not given """ if out is None: out = sys.stdout out.write("Reader status:\n") out.write(" CPO expressions:\n") for k in sorted(self.cpo_exprs.keys()): v = self.cpo_exprs[k] out.write(" {}: {} ({})\n".format(k, v, type(v))) out.write(" Model expressions:\n") for x in self.model.get_all_expressions(): out.write(" {}\n".format(x[0]))
############################################################################### ## Utility functions ############################################################################### def _get_fzn_array(fzo): """ Get the list of objects of an FZN object Args: fzo: FZN object Returns: Array of FZN objects """ return fzo if isinstance(fzo, list) else fzo.value def _get_value(expr): """ Get the python value of an expression Args: expr: Expression (python or CPO) Returns: Python value """ return expr.value if isinstance(expr, CpoValue) else expr def _build_domain(v): """ Build a variable domain from an initial value Args: v: Variable initial value Returns: Variable domain """ if v is True: return (1,) if v is False: return (0,) return (v,) # def _build_cpo_value(v): # """ Build a value from a single Python value # Args: # v: Value # Returns: # Corresponding CPO value # """ # if v is True: # v = 1 # elif v is False: # v = 0 # return build_cpo_expr(v) def _mutl_by_int(expr, val): """ Create an expression that multiply an expression by an integer Args: expr: Expression to constrain val: Integer value to multiply expression with Returns: New expression """ if val == 1: return expr if val == -1: return -expr if val == 0: return 0 return val * expr