From 4455ecf928eeef896587733453cb24dde88bbfbb Mon Sep 17 00:00:00 2001 From: Laurent Perron Date: Thu, 5 Dec 2024 18:40:50 +0100 Subject: [PATCH] [CP-SAT] add new scheduling example; improve hint preservation; add rare crash in presolve --- ortools/sat/cp_model_presolve.cc | 21 +- ortools/sat/docs/scheduling.md | 261 +++++++++++++++++ ortools/sat/presolve_context.cc | 63 +++-- ortools/sat/presolve_context.h | 4 + ortools/sat/samples/BUILD.bazel | 2 + .../sequences_in_no_overlap_sample_sat.py | 262 ++++++++++++++++++ 6 files changed, 586 insertions(+), 27 deletions(-) create mode 100644 ortools/sat/samples/sequences_in_no_overlap_sample_sat.py diff --git a/ortools/sat/cp_model_presolve.cc b/ortools/sat/cp_model_presolve.cc index cd190f9a2d..f377553a18 100644 --- a/ortools/sat/cp_model_presolve.cc +++ b/ortools/sat/cp_model_presolve.cc @@ -2278,8 +2278,8 @@ bool CpModelPresolver::RemoveSingletonInLinear(ConstraintProto* ct) { // Just fix everything. context_->UpdateRuleStats("independent linear: solved by DP"); for (int i = 0; i < num_vars; ++i) { - if (!context_->IntersectDomainWith(ct->linear().vars(i), - Domain(result.solution[i]))) { + if (!context_->IntersectDomainWithAndUpdateHint( + ct->linear().vars(i), Domain(result.solution[i]))) { return false; } } @@ -2291,7 +2291,8 @@ bool CpModelPresolver::RemoveSingletonInLinear(ConstraintProto* ct) { if (ct->enforcement_literal().size() == 1) { indicator = ct->enforcement_literal(0); } else { - indicator = context_->NewBoolVar("indicator"); + indicator = + context_->NewBoolVarWithConjunction(ct->enforcement_literal()); auto* new_ct = context_->working_model->add_constraints(); *new_ct->mutable_enforcement_literal() = ct->enforcement_literal(); new_ct->mutable_bool_or()->add_literals(indicator); @@ -2302,12 +2303,16 @@ bool CpModelPresolver::RemoveSingletonInLinear(ConstraintProto* ct) { costs[i] > 0 ? domains[i].Min() : domains[i].Max(); const int64_t other_value = result.solution[i]; if (best_value == other_value) { - if (!context_->IntersectDomainWith(ct->linear().vars(i), - Domain(best_value))) { + if (!context_->IntersectDomainWithAndUpdateHint( + ct->linear().vars(i), Domain(best_value))) { return false; } continue; } + context_->UpdateVarSolutionHint( + ct->linear().vars(i), context_->LiteralSolutionHint(indicator) + ? other_value + : best_value); if (RefIsPositive(indicator)) { if (!context_->StoreAffineRelation(ct->linear().vars(i), indicator, other_value - best_value, @@ -11763,7 +11768,8 @@ void CpModelPresolver::ProcessVariableOnlyUsedInEncoding(int var) { int64_t value1, value2; if (cost == 0) { context_->UpdateRuleStats("variables: fix singleton var in linear1"); - return (void)context_->IntersectDomainWith(var, Domain(implied.Min())); + return (void)context_->IntersectDomainWithAndUpdateHint( + var, Domain(implied.Min())); } else if (cost > 0) { value1 = context_->MinOf(var); value2 = implied.Min(); @@ -11775,7 +11781,7 @@ void CpModelPresolver::ProcessVariableOnlyUsedInEncoding(int var) { // Nothing else to do in this case, the constraint will be reduced to // a pure Boolean constraint later. context_->UpdateRuleStats("variables: reduced domain to two values"); - return (void)context_->IntersectDomainWith( + return (void)context_->IntersectDomainWithAndUpdateHint( var, Domain::FromValues({value1, value2})); } } @@ -12004,6 +12010,7 @@ void CpModelPresolver::ProcessVariableOnlyUsedInEncoding(int var) { } PresolveAtMostOne(new_ct); } + if (context_->ModelIsUnsat()) return; // Add enough constraints to the mapping model to recover a valid value // for var when all the booleans are fixed. diff --git a/ortools/sat/docs/scheduling.md b/ortools/sat/docs/scheduling.md index 091ab8891f..58b205b681 100644 --- a/ortools/sat/docs/scheduling.md +++ b/ortools/sat/docs/scheduling.md @@ -2609,4 +2609,265 @@ def transitions_in_no_overlap_sample_sat(): transitions_in_no_overlap_sample_sat() ``` +## Managing sequences in a no_overlap constraint + +In some scheduling problems, there can be constraints on sequence of tasks. For +instance, tasks of a given type may have limit on any contiguous constraints. + +The circuit constraint is used to maintain the current length of any sequence. + +### Python code + +```python +#!/usr/bin/env python3 +"""Implements sequence constraints in a no_overlap constraint.""" + +from typing import Dict, List, Sequence, Tuple + +from ortools.sat.python import cp_model + + +def sequence_constraints_with_circuit( + model: cp_model.CpModel, + starts: Sequence[cp_model.IntVar], + durations: Sequence[int], + task_types: Sequence[str], + lengths: Sequence[cp_model.IntVar], + cumuls: Sequence[cp_model.IntVar], + sequence_length_constraints: Dict[str, Tuple[int, int]], + sequence_cumul_constraints: Dict[str, Tuple[int, int, int]], +) -> Sequence[Tuple[cp_model.IntVar, int]]: + """This method uses a circuit constraint to rank tasks. + + This method assumes that all starts are disjoint, meaning that all tasks have + a strictly positive duration, and they appear in the same NoOverlap + constraint. + + The extra node (with id 0) will be used to decide which task is first with + its only outgoing arc, and which task is last with its only incoming arc. + Each task i will be associated with id i + 1, and an arc between i + 1 and j + + 1 indicates that j is the immediate successor of i. + + The circuit constraint ensures there is at most 1 hamiltonian cycle of + length > 1. If no such path exists, then no tasks are active. + We also need to enforce that any hamiltonian cycle of size > 1 must contain + the node 0. And thus, there is a self loop on node 0 iff the circuit is empty. + + Args: + model: The CpModel to add the constraints to. + starts: The array of starts variables of all tasks. + durations: the durations of all tasks. + task_types: The type of all tasks. + lengths: The computed length of the current sequence for each task. + cumuls: The computed cumul of the current sequence for each task. + sequence_length_constraints: the array of tuple (`task_type`, + (`length_min`, `length_max`)) that specifies the minimum and maximum + length of the sequence of tasks of type `task_type`. + sequence_cumul_constraints: the array of tuple (`task_type`, + (`soft_max`, `linear_penalty`, `hard_max`)) that specifies that if the + cumul of the sequence of tasks of type `task_type` is greater than + `soft_max`, then `linear_penalty` must be added to the cost + + Returns: + The list of pairs (Boolean variables, penalty) to be added to the objective. + """ + + num_tasks = len(starts) + all_tasks = range(num_tasks) + + arcs: List[cp_model.ArcT] = [] + penalty_terms = [] + for i in all_tasks: + # if node i is first. + start_lit = model.new_bool_var(f"start_{i}") + arcs.append((0, i + 1, start_lit)) + model.add(lengths[i] == 1).only_enforce_if(start_lit) + model.add(cumuls[i] == durations[i]).only_enforce_if(start_lit) + + # As there are no other constraints on the problem, we can add this + # redundant constraint. + model.add(starts[i] == 0).only_enforce_if(start_lit) + + # if node i is last. + end_lit = model.new_bool_var(f"end_{i}") + arcs.append((i + 1, 0, end_lit)) + + # Penalize the cumul of the last task w.r.t. the soft max + soft_max, linear_penalty, hard_max = sequence_cumul_constraints[task_types[i]] + if soft_max < hard_max: + aux = model.new_int_var(0, hard_max - soft_max, f"aux_{i}") + model.add_max_equality(aux, [0, cumuls[i] - soft_max]) + + excess = model.new_int_var(0, hard_max - soft_max, f"excess_{i}") + model.add(excess == aux).only_enforce_if(end_lit) + model.add(excess == 0).only_enforce_if(~end_lit) + penalty_terms.append((excess, linear_penalty)) + + for j in all_tasks: + if i == j: + continue + lit = model.new_bool_var(f"arc_{i}_to_{j}") + arcs.append((i + 1, j + 1, lit)) + + # To perform the transitive reduction from precedences to successors, + # we need to tie the starts of the tasks with 'literal'. + # In a non pure problem, the following equality must be an inequality. + model.add(starts[j] == starts[i] + durations[i]).only_enforce_if(lit) + + # We add the constraint to incrementally maintain the length and the cumul + # variables of the sequence. + if task_types[i] == task_types[j]: # Same task type. + # Increase the length of the sequence by 1. + model.add(lengths[j] == lengths[i] + 1).only_enforce_if(lit) + + # Make sure the length of the sequence is within the bounds. + length_max = sequence_length_constraints[task_types[j]][1] + model.add(lengths[j] <= length_max).only_enforce_if(lit) + + # Increase the cumul of the sequence by the duration of the task. + model.add(cumuls[j] == cumuls[i] + durations[j]).only_enforce_if(lit) + + # Make sure the cumul of the sequence is within the bounds. + cumul_hard_max = sequence_cumul_constraints[task_types[j]][2] + model.add(cumuls[j] <= cumul_hard_max).only_enforce_if(lit) + + else: # Switching task type. + # Reset the length to 1. + model.add(lengths[j] == 1).only_enforce_if(lit) + + # Make sure the previous length is within bounds. + length_min = sequence_length_constraints[task_types[i]][0] + model.add(lengths[i] >= length_min).only_enforce_if(lit) + + # Reset the cumul to the duration of the task. + model.add(cumuls[j] == durations[j]).only_enforce_if(lit) + + # Penalize the cumul of the previous task w.r.t. the soft max + if soft_max < hard_max: + aux = model.new_int_var(0, hard_max - soft_max, f"aux_{i}") + model.add_max_equality(aux, [0, cumuls[i] - soft_max]) + + excess = model.new_int_var(0, hard_max - soft_max, f"excess_{i}") + model.add(excess == aux).only_enforce_if(lit) + model.add(excess == 0).only_enforce_if(~lit) + penalty_terms.append((excess, linear_penalty)) + + # Add the circuit constraint. + model.add_circuit(arcs) + + return penalty_terms + + +def sequences_in_no_overlap_sample_sat(): + """Implement cumul and length constraints in a NoOverlap constraint.""" + + # Tasks (duration, type). + tasks = [ + (5, "A"), + (6, "A"), + (7, "A"), + (2, "A"), + (3, "A"), + (5, "B"), + (2, "B"), + (3, "B"), + (1, "B"), + (4, "B"), + (3, "B"), + (6, "B"), + (2, "B"), + ] + + # Sequence length constraints on task_types: (hard_min, hard_max) + sequence_length_constraints = { + "A": (1, 3), + "B": (2, 2), + } + + # Sequence cumul constraints on task_types: + # (soft_max, linear_penalty, hard_max) + sequence_cumul_constraints = { + "A": (6, 1, 10), + "B": (7, 0, 7), + } + + model: cp_model.CpModel = cp_model.CpModel() + horizon: int = sum(t[0] for t in tasks) + + num_tasks = len(tasks) + all_tasks = range(num_tasks) + + starts = [] + durations = [] + intervals = [] + task_types = [] + + # Creates intervals for each task. + for duration, task_type in tasks: + index = len(starts) + start = model.new_int_var(0, horizon - duration, f"start[{index}]") + interval = model.new_fixed_size_interval_var( + start, duration, f"interval[{index}]" + ) + + starts.append(start) + durations.append(duration) + intervals.append(interval) + task_types.append(task_type) + + # Create length variables for each task. + lengths = [] + max_length = max(c[1] for c in sequence_length_constraints.values()) + for i in all_tasks: + lengths.append(model.new_int_var(0, max_length, f"length_{i}")) + + # Create cumul variables for each task. + cumuls = [] + max_cumul = max(c[2] for c in sequence_cumul_constraints.values()) + for i in all_tasks: + cumuls.append(model.new_int_var(0, max_cumul, f"cumul_{i}")) + + # Adds NoOverlap constraint. + model.add_no_overlap(intervals) + + # Adds the constraints on the partial lengths and cumuls of the sequence of + # tasks. + penalty_terms = sequence_constraints_with_circuit( + model, + starts, + durations, + task_types, + lengths, + cumuls, + sequence_length_constraints, + sequence_cumul_constraints, + ) + + # Minimize the sum of penalties, + model.minimize(sum(var * penalty for var, penalty in penalty_terms)) + + # Solves the model model. + solver = cp_model.CpSolver() + status = solver.solve(model) + + if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE: + # Prints out the makespan and the start times and ranks of all tasks. + print(f"Optimal cost: {solver.objective_value}") + to_sort = [] + for t in all_tasks: + to_sort.append((solver.value(starts[t]), t)) + to_sort.sort() + for start, t in to_sort: + print( + f"Task {t} of type {task_types[t]} with duration" + f" {durations[t]} starts at {start}, length =" + f" {solver.value(lengths[t])}, cumul = {solver.value(cumuls[t])} " + ) + else: + print(f"Solver exited with nonoptimal status: {status}") + + +sequences_in_no_overlap_sample_sat() +``` + ## Convex hull of a set of intervals diff --git a/ortools/sat/presolve_context.cc b/ortools/sat/presolve_context.cc index c40249c5ff..d605f385a7 100644 --- a/ortools/sat/presolve_context.cc +++ b/ortools/sat/presolve_context.cc @@ -138,33 +138,49 @@ int PresolveContext::NewBoolVar(absl::string_view source) { int PresolveContext::NewBoolVarWithClause(absl::Span clause) { const int new_var = NewBoolVar("with clause"); if (hint_is_loaded_) { + int new_hint = 0; bool all_have_hint = true; for (const int literal : clause) { const int var = PositiveRef(literal); if (!hint_has_value_[var]) { all_have_hint = false; - continue; + break; } - if (RefIsPositive(literal)) { - if (hint_[var] == 1) { - hint_has_value_[new_var] = true; - hint_[new_var] = 1; - break; - } - } else { - if (hint_[var] == 0) { - hint_has_value_[new_var] = true; - hint_[new_var] = 1; - break; - } + if (hint_[var] == (RefIsPositive(literal) ? 1 : 0)) { + new_hint = 1; + break; } } - - // If all literals where hinted and at zero, we set the hint of - // new_var to zero, otherwise we leave it unassigned. - if (all_have_hint && !hint_has_value_[new_var]) { + // Leave the `new_var` hint unassigned if any literal is not hinted. + if (all_have_hint) { hint_has_value_[new_var] = true; - hint_[new_var] = 0; + hint_[new_var] = new_hint; + } + } + return new_var; +} + +int PresolveContext::NewBoolVarWithConjunction( + absl::Span conjunction) { + const int new_var = NewBoolVar("with conjunction"); + if (hint_is_loaded_) { + int new_hint = 1; + bool all_have_hint = true; + for (const int literal : conjunction) { + const int var = PositiveRef(literal); + if (!hint_has_value_[var]) { + all_have_hint = false; + break; + } + if (hint_[var] == (RefIsPositive(literal) ? 0 : 1)) { + new_hint = 0; + break; + } + } + // Leave the `new_var` hint unassigned if any literal is not hinted. + if (all_have_hint) { + hint_has_value_[new_var] = true; + hint_[new_var] = new_hint; } } return new_var; @@ -1492,6 +1508,9 @@ void PresolveContext::CanonicalizeDomainOfSizeTwo(int var) { } else { UpdateRuleStats("variables with 2 values: create encoding literal"); max_literal = NewBoolVar("var with 2 values"); + if (hint_is_loaded_ && hint_has_value_[var]) { + SetNewVariableHint(max_literal, hint_[var] == var_max ? 1 : 0); + } min_literal = NegatedRef(max_literal); var_map[var_min] = SavedLiteral(min_literal); var_map[var_max] = SavedLiteral(max_literal); @@ -1830,14 +1849,18 @@ int PresolveContext::GetOrCreateVarValueEncoding(int ref, int64_t value) { return value == 1 ? representative : NegatedRef(representative); } else { const int literal = NewBoolVar("integer encoding"); - InsertVarValueEncoding(literal, var, var_max); + if (!InsertVarValueEncoding(literal, var, var_max)) { + return GetFalseLiteral(); + } const int representative = GetLiteralRepresentative(literal); return value == var_max ? representative : NegatedRef(representative); } } const int literal = NewBoolVar("integer encoding"); - InsertVarValueEncoding(literal, var, value); + if (!InsertVarValueEncoding(literal, var, value)) { + return GetFalseLiteral(); + } return GetLiteralRepresentative(literal); } diff --git a/ortools/sat/presolve_context.h b/ortools/sat/presolve_context.h index 5445b8305f..9c288d8e9e 100644 --- a/ortools/sat/presolve_context.h +++ b/ortools/sat/presolve_context.h @@ -115,6 +115,10 @@ class PresolveContext { // Its hint value will be the same as the value of the given clause. int NewBoolVarWithClause(absl::Span clause); + // Create a new bool var. + // Its hint value will be the same as the value of the given conjunction. + int NewBoolVarWithConjunction(absl::Span conjunction); + // Some expansion code use constant literal to be simpler to write. This will // create a NewBoolVar() the first time, but later call will just returns it. int GetTrueLiteral(); diff --git a/ortools/sat/samples/BUILD.bazel b/ortools/sat/samples/BUILD.bazel index 95de48b809..b53bcc8071 100644 --- a/ortools/sat/samples/BUILD.bazel +++ b/ortools/sat/samples/BUILD.bazel @@ -94,6 +94,8 @@ code_sample_py(name = "scheduling_with_calendar_sample_sat") code_sample_cc_go_py(name = "search_for_all_solutions_sample_sat") +code_sample_py(name = "sequences_in_no_overlap_sample_sat") + code_sample_cc_go_py(name = "simple_sat_program") code_sample_cc_go_py(name = "solution_hinting_sample_sat") diff --git a/ortools/sat/samples/sequences_in_no_overlap_sample_sat.py b/ortools/sat/samples/sequences_in_no_overlap_sample_sat.py new file mode 100644 index 0000000000..fa4f7bd35d --- /dev/null +++ b/ortools/sat/samples/sequences_in_no_overlap_sample_sat.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +# Copyright 2010-2024 Google LLC +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Implements sequence constraints in a no_overlap constraint.""" + +from typing import Dict, List, Sequence, Tuple + +from ortools.sat.python import cp_model + + +def sequence_constraints_with_circuit( + model: cp_model.CpModel, + starts: Sequence[cp_model.IntVar], + durations: Sequence[int], + task_types: Sequence[str], + lengths: Sequence[cp_model.IntVar], + cumuls: Sequence[cp_model.IntVar], + sequence_length_constraints: Dict[str, Tuple[int, int]], + sequence_cumul_constraints: Dict[str, Tuple[int, int, int]], +) -> Sequence[Tuple[cp_model.IntVar, int]]: + """This method uses a circuit constraint to rank tasks. + + This method assumes that all starts are disjoint, meaning that all tasks have + a strictly positive duration, and they appear in the same NoOverlap + constraint. + + The extra node (with id 0) will be used to decide which task is first with + its only outgoing arc, and which task is last with its only incoming arc. + Each task i will be associated with id i + 1, and an arc between i + 1 and j + + 1 indicates that j is the immediate successor of i. + + The circuit constraint ensures there is at most 1 hamiltonian cycle of + length > 1. If no such path exists, then no tasks are active. + We also need to enforce that any hamiltonian cycle of size > 1 must contain + the node 0. And thus, there is a self loop on node 0 iff the circuit is empty. + + Args: + model: The CpModel to add the constraints to. + starts: The array of starts variables of all tasks. + durations: the durations of all tasks. + task_types: The type of all tasks. + lengths: The computed length of the current sequence for each task. + cumuls: The computed cumul of the current sequence for each task. + sequence_length_constraints: the array of tuple (`task_type`, + (`length_min`, `length_max`)) that specifies the minimum and maximum + length of the sequence of tasks of type `task_type`. + sequence_cumul_constraints: the array of tuple (`task_type`, + (`soft_max`, `linear_penalty`, `hard_max`)) that specifies that if the + cumul of the sequence of tasks of type `task_type` is greater than + `soft_max`, then `linear_penalty` must be added to the cost + + Returns: + The list of pairs (Boolean variables, penalty) to be added to the objective. + """ + + num_tasks = len(starts) + all_tasks = range(num_tasks) + + arcs: List[cp_model.ArcT] = [] + penalty_terms = [] + for i in all_tasks: + # if node i is first. + start_lit = model.new_bool_var(f"start_{i}") + arcs.append((0, i + 1, start_lit)) + model.add(lengths[i] == 1).only_enforce_if(start_lit) + model.add(cumuls[i] == durations[i]).only_enforce_if(start_lit) + + # As there are no other constraints on the problem, we can add this + # redundant constraint. + model.add(starts[i] == 0).only_enforce_if(start_lit) + + # if node i is last. + end_lit = model.new_bool_var(f"end_{i}") + arcs.append((i + 1, 0, end_lit)) + + # Penalize the cumul of the last task w.r.t. the soft max + soft_max, linear_penalty, hard_max = sequence_cumul_constraints[task_types[i]] + if soft_max < hard_max: + aux = model.new_int_var(0, hard_max - soft_max, f"aux_{i}") + model.add_max_equality(aux, [0, cumuls[i] - soft_max]) + + excess = model.new_int_var(0, hard_max - soft_max, f"excess_{i}") + model.add(excess == aux).only_enforce_if(end_lit) + model.add(excess == 0).only_enforce_if(~end_lit) + penalty_terms.append((excess, linear_penalty)) + + for j in all_tasks: + if i == j: + continue + lit = model.new_bool_var(f"arc_{i}_to_{j}") + arcs.append((i + 1, j + 1, lit)) + + # To perform the transitive reduction from precedences to successors, + # we need to tie the starts of the tasks with 'literal'. + # In a non pure problem, the following equality must be an inequality. + model.add(starts[j] == starts[i] + durations[i]).only_enforce_if(lit) + + # We add the constraint to incrementally maintain the length and the cumul + # variables of the sequence. + if task_types[i] == task_types[j]: # Same task type. + # Increase the length of the sequence by 1. + model.add(lengths[j] == lengths[i] + 1).only_enforce_if(lit) + + # Make sure the length of the sequence is within the bounds. + length_max = sequence_length_constraints[task_types[j]][1] + model.add(lengths[j] <= length_max).only_enforce_if(lit) + + # Increase the cumul of the sequence by the duration of the task. + model.add(cumuls[j] == cumuls[i] + durations[j]).only_enforce_if(lit) + + # Make sure the cumul of the sequence is within the bounds. + cumul_hard_max = sequence_cumul_constraints[task_types[j]][2] + model.add(cumuls[j] <= cumul_hard_max).only_enforce_if(lit) + + else: # Switching task type. + # Reset the length to 1. + model.add(lengths[j] == 1).only_enforce_if(lit) + + # Make sure the previous length is within bounds. + length_min = sequence_length_constraints[task_types[i]][0] + model.add(lengths[i] >= length_min).only_enforce_if(lit) + + # Reset the cumul to the duration of the task. + model.add(cumuls[j] == durations[j]).only_enforce_if(lit) + + # Penalize the cumul of the previous task w.r.t. the soft max + if soft_max < hard_max: + aux = model.new_int_var(0, hard_max - soft_max, f"aux_{i}") + model.add_max_equality(aux, [0, cumuls[i] - soft_max]) + + excess = model.new_int_var(0, hard_max - soft_max, f"excess_{i}") + model.add(excess == aux).only_enforce_if(lit) + model.add(excess == 0).only_enforce_if(~lit) + penalty_terms.append((excess, linear_penalty)) + + # Add the circuit constraint. + model.add_circuit(arcs) + + return penalty_terms + + +def sequences_in_no_overlap_sample_sat(): + """Implement cumul and length constraints in a NoOverlap constraint.""" + + # Tasks (duration, type). + tasks = [ + (5, "A"), + (6, "A"), + (7, "A"), + (2, "A"), + (3, "A"), + (5, "B"), + (2, "B"), + (3, "B"), + (1, "B"), + (4, "B"), + (3, "B"), + (6, "B"), + (2, "B"), + ] + + # Sequence length constraints on task_types: (hard_min, hard_max) + sequence_length_constraints = { + "A": (1, 3), + "B": (2, 2), + } + + # Sequence cumul constraints on task_types: + # (soft_max, linear_penalty, hard_max) + sequence_cumul_constraints = { + "A": (6, 1, 10), + "B": (7, 0, 7), + } + + model: cp_model.CpModel = cp_model.CpModel() + horizon: int = sum(t[0] for t in tasks) + + num_tasks = len(tasks) + all_tasks = range(num_tasks) + + starts = [] + durations = [] + intervals = [] + task_types = [] + + # Creates intervals for each task. + for duration, task_type in tasks: + index = len(starts) + start = model.new_int_var(0, horizon - duration, f"start[{index}]") + interval = model.new_fixed_size_interval_var( + start, duration, f"interval[{index}]" + ) + + starts.append(start) + durations.append(duration) + intervals.append(interval) + task_types.append(task_type) + + # Create length variables for each task. + lengths = [] + max_length = max(c[1] for c in sequence_length_constraints.values()) + for i in all_tasks: + lengths.append(model.new_int_var(0, max_length, f"length_{i}")) + + # Create cumul variables for each task. + cumuls = [] + max_cumul = max(c[2] for c in sequence_cumul_constraints.values()) + for i in all_tasks: + cumuls.append(model.new_int_var(0, max_cumul, f"cumul_{i}")) + + # Adds NoOverlap constraint. + model.add_no_overlap(intervals) + + # Adds the constraints on the partial lengths and cumuls of the sequence of + # tasks. + penalty_terms = sequence_constraints_with_circuit( + model, + starts, + durations, + task_types, + lengths, + cumuls, + sequence_length_constraints, + sequence_cumul_constraints, + ) + + # Minimize the sum of penalties, + model.minimize(sum(var * penalty for var, penalty in penalty_terms)) + + # Solves the model model. + solver = cp_model.CpSolver() + status = solver.solve(model) + + if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE: + # Prints out the makespan and the start times and ranks of all tasks. + print(f"Optimal cost: {solver.objective_value}") + to_sort = [] + for t in all_tasks: + to_sort.append((solver.value(starts[t]), t)) + to_sort.sort() + for start, t in to_sort: + print( + f"Task {t} of type {task_types[t]} with duration" + f" {durations[t]} starts at {start}, length =" + f" {solver.value(lengths[t])}, cumul = {solver.value(cumuls[t])} " + ) + else: + print(f"Solver exited with nonoptimal status: {status}") + + +sequences_in_no_overlap_sample_sat()