#!/usr/bin/env python3 # Copyright 2010-2022 Google LLC # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Sat based solver for the RCPSP problems (see rcpsp.proto). Introduction to the problem: https://www.projectmanagement.ugent.be/research/project_scheduling/rcpsp Data use in flags: http://www.om-db.wi.tum.de/psplib/data.html """ import collections from absl import app from absl import flags from google.protobuf import text_format from ortools.sat.python import cp_model from ortools.scheduling import pywraprcpsp from ortools.scheduling import rcpsp_pb2 FLAGS = flags.FLAGS _INPUT = flags.DEFINE_string('input', '', 'Input file to parse and solve.') _OUTPUT_PROTO = flags.DEFINE_string( 'output_proto', '', 'Output file to write the cp_model proto to.') _PARAMS = flags.DEFINE_string('params', '', 'Sat solver parameters.') _USE_INTERVAL_MAKESPAN = flags.DEFINE_bool( 'use_interval_makespan', True, 'Whether we encode the makespan using an interval or not.') _HORIZON = flags.DEFINE_integer('horizon', -1, 'Force horizon.') _ADD_REDUNDANT_ENERGETIC_CONSTRAINTS = flags.DEFINE_bool( 'add_redundant_energetic_constraints', False, 'Add redundant energetic constraints on the pairs of tasks extracted from' + ' precedence graph.') _DELAY_TIME_LIMIT = flags.DEFINE_float( 'delay_time_limit', 0.0, 'Time limit when computing min delay between tasks.' + ' A non-positive time limit disable min delays computation.') _PREEMPTIVE_LB_TIME_LIMIT = flags.DEFINE_float( 'preemptive_lb_time_limit', 0.0, 'Time limit when computing a preemptive schedule lower bound.' + ' A non-positive time limit disable this computation.') def PrintProblemStatistics(problem): """Display various statistics on the problem.""" # Determine problem type. problem_type = ('Resource Investment Problem' if problem.is_resource_investment else 'RCPSP') num_resources = len(problem.resources) num_tasks = len(problem.tasks) - 2 # 2 sentinels. tasks_with_alternatives = 0 variable_duration_tasks = 0 tasks_with_delay = 0 for task in problem.tasks: if len(task.recipes) > 1: tasks_with_alternatives += 1 duration_0 = task.recipes[0].duration for recipe in task.recipes: if recipe.duration != duration_0: variable_duration_tasks += 1 break if task.successor_delays: tasks_with_delay += 1 if problem.is_rcpsp_max: problem_type += '/Max delay' # We print 2 less tasks as these are sentinel tasks that are not counted in # the description of the rcpsp models. if problem.is_consumer_producer: print(f'Solving {problem_type} with:') print(f' - {num_resources} reservoir resources') print(f' - {num_tasks} tasks') else: print(f'Solving {problem_type} with:') print(f' - {num_resources} renewable resources') print(f' - {num_tasks} tasks') if tasks_with_alternatives: print( f' - {tasks_with_alternatives} tasks with alternative resources' ) if variable_duration_tasks: print( f' - {variable_duration_tasks} tasks with variable durations' ) if tasks_with_delay: print(f' - {tasks_with_delay} tasks with successor delays') def AnalyseDependencyGraph(problem): """Analyses the dependency graph to improve the model. Args: problem: the protobuf of the problem to solve. Returns: a list of (task1, task2, in_between_tasks) with task2 and indirect successor of task1, and in_between_tasks being the list of all tasks after task1 and before task2. """ num_nodes = len(problem.tasks) print(f'Analysing the dependency graph over {num_nodes} nodes') ins = collections.defaultdict(list) outs = collections.defaultdict(list) after = collections.defaultdict(set) before = collections.defaultdict(set) # Build the transitive closure of the precedences. # This algorithm has the wrong complexity (n^4), but is OK for the psplib # as the biggest example has 120 nodes. for n in range(num_nodes): for s in problem.tasks[n].successors: ins[s].append(n) outs[n].append(s) for a in list(after[s]) + [s]: for b in list(before[n]) + [n]: after[b].add(a) before[a].add(b) # Search for pair of tasks, containing at least two parallel branch between # them in the precedence graph. num_candidates = 0 result = [] for source, start_outs in outs.items(): if len(start_outs) <= 1: # Starting with the unique successor of source will be as good. continue for sink, end_ins in ins.items(): if len(end_ins) <= 1: # Ending with the unique predecessor of sink will be as good. continue if sink == source: continue if sink not in after[source]: continue num_active_outgoing_branches = 0 num_active_incoming_branches = 0 for succ in outs[source]: if sink in after[succ]: num_active_outgoing_branches += 1 for pred in ins[sink]: if source in before[pred]: num_active_incoming_branches += 1 if num_active_outgoing_branches <= 1 or num_active_incoming_branches <= 1: continue common = after[source].intersection(before[sink]) if len(common) <= 1: continue num_candidates += 1 result.append((source, sink, common)) # Sort entries lexicographically by (len(common), source, sink) def Price(entry): return (num_nodes * num_nodes * len(entry[2]) + num_nodes * entry[0] + entry[1]) result.sort(key=Price) print(f' - created {len(result)} pairs of nodes to examine', flush=True) return result, after def SolveRcpsp(problem, proto_file, params, active_tasks, source, sink, intervals_of_tasks, delays, in_main_solve=False, initial_solution=None, lower_bound=0): """Parse and solve a given RCPSP problem in proto format. The model will only look at the tasks {source} + {sink} + active_tasks, and ignore all others. Args: problem: the description of the model to solve in protobuf format proto_file: the name of the file to export the CpModel proto to. params: the string representation of the parameters to pass to the sat solver. active_tasks: the set of active tasks to consider. source: the source task in the graph. Its end will be forced to 0. sink: the sink task of the graph. Its start is the makespan of the problem. intervals_of_tasks: a heuristic lists of (task1, task2, tasks) used to add redundant energetic equations to the model. delays: a list of (task1, task2, min_delays) used to add extended precedence constraints (start(task2) >= end(task1) + min_delay). in_main_solve: indicates if this is the main solve procedure. initial_solution: A valid assignment used to hint the search. lower_bound: A valid lower bound of the makespan objective. Returns: (lower_bound of the objective, best solution found, asssignment) """ # Create the model. model = cp_model.CpModel() model.SetName(problem.name) num_resources = len(problem.resources) all_active_tasks = list(active_tasks) all_active_tasks.sort() all_resources = range(num_resources) horizon = problem.deadline if problem.deadline != -1 else problem.horizon if _HORIZON.value > 0: horizon = _HORIZON.value elif delays and in_main_solve and (source, sink) in delays: horizon = delays[(source, sink)][1] elif horizon == -1: # Naive computation. horizon = sum(max(r.duration for r in t.recipes) for t in problem.tasks) if problem.is_rcpsp_max: for t in problem.tasks: for sd in t.successor_delays: for rd in sd.recipe_delays: for d in rd.min_delays: horizon += abs(d) if in_main_solve: print(f'Horizon = {horizon}', flush=True) # Containers. task_starts = {} task_ends = {} task_durations = {} task_intervals = {} task_resource_to_energy = {} task_to_resource_demands = collections.defaultdict(list) task_to_presence_literals = collections.defaultdict(list) task_to_recipe_durations = collections.defaultdict(list) task_resource_to_fixed_demands = collections.defaultdict(dict) task_resource_to_max_energy = collections.defaultdict(int) resource_to_sum_of_demand_max = collections.defaultdict(int) # Create task variables. for t in all_active_tasks: task = problem.tasks[t] num_recipes = len(task.recipes) all_recipes = range(num_recipes) start_var = model.NewIntVar(0, horizon, f'start_of_task_{t}') end_var = model.NewIntVar(0, horizon, f'end_of_task_{t}') literals = [] if num_recipes > 1: # Create one literal per recipe. literals = [ model.NewBoolVar(f'is_present_{t}_{r}') for r in all_recipes ] # Exactly one recipe must be performed. model.AddExactlyOne(literals) else: literals = [1] # Temporary data structure to fill in 0 demands. demand_matrix = collections.defaultdict(int) # Scan recipes and build the demand matrix and the vector of durations. for recipe_index, recipe in enumerate(task.recipes): task_to_recipe_durations[t].append(recipe.duration) for demand, resource in zip(recipe.demands, recipe.resources): demand_matrix[(resource, recipe_index)] = demand # Create the duration variable from the accumulated durations. duration_var = model.NewIntVarFromDomain( cp_model.Domain.FromValues(task_to_recipe_durations[t]), f'duration_of_task_{t}') # Link the recipe literals and the duration_var. for r in range(num_recipes): model.Add( duration_var == task_to_recipe_durations[t][r]).OnlyEnforceIf( literals[r]) # Create the interval of the task. task_interval = model.NewIntervalVar(start_var, duration_var, end_var, f'task_interval_{t}') # Store task variables. task_starts[t] = start_var task_ends[t] = end_var task_durations[t] = duration_var task_intervals[t] = task_interval task_to_presence_literals[t] = literals # Create the demand variable of the task for each resource. for res in all_resources: demands = [demand_matrix[(res, recipe)] for recipe in all_recipes] task_resource_to_fixed_demands[(t, res)] = demands demand_var = model.NewIntVarFromDomain( cp_model.Domain.FromValues(demands), f'demand_{t}_{res}') task_to_resource_demands[t].append(demand_var) # Link the recipe literals and the demand_var. for r in all_recipes: model.Add(demand_var == demand_matrix[(res, r)]).OnlyEnforceIf( literals[r]) resource_to_sum_of_demand_max[res] += max(demands) # Create the energy expression for (task, resource): for res in all_resources: task_resource_to_energy[(t, res)] = sum( literals[r] * task_to_recipe_durations[t][r] * task_resource_to_fixed_demands[(t, res)][r] for r in all_recipes) task_resource_to_max_energy[(t, res)] = max( task_to_recipe_durations[t][r] * task_resource_to_fixed_demands[(t, res)][r] for r in all_recipes) # Create makespan variable makespan = model.NewIntVar(lower_bound, horizon, 'makespan') makespan_size = model.NewIntVar(1, horizon, 'interval_makespan_size') interval_makespan = model.NewIntervalVar(makespan, makespan_size, model.NewConstant(horizon + 1), 'interval_makespan') # Add precedences. if problem.is_rcpsp_max: # In RCPSP/Max problem, precedences are given and max delay (possible # negative) between the starts of two tasks. for task_id in all_active_tasks: task = problem.tasks[task_id] num_modes = len(task.recipes) for successor_index in range(len(task.successors)): next_id = task.successors[successor_index] delay_matrix = task.successor_delays[successor_index] num_next_modes = len(problem.tasks[next_id].recipes) for m1 in range(num_modes): s1 = task_starts[task_id] p1 = task_to_presence_literals[task_id][m1] if next_id == sink: delay = delay_matrix.recipe_delays[m1].min_delays[0] model.Add(s1 + delay <= makespan).OnlyEnforceIf(p1) else: for m2 in range(num_next_modes): delay = delay_matrix.recipe_delays[m1].min_delays[ m2] s2 = task_starts[next_id] p2 = task_to_presence_literals[next_id][m2] model.Add(s1 + delay <= s2).OnlyEnforceIf([p1, p2]) else: # Normal dependencies (task ends before the start of successors). for t in all_active_tasks: for n in problem.tasks[t].successors: if n == sink: model.Add(task_ends[t] <= makespan) elif n in active_tasks: model.Add(task_ends[t] <= task_starts[n]) # Containers for resource investment problems. capacities = [] # Capacity variables for all resources. max_cost = 0 # Upper bound on the investment cost. # Create resources. for res in all_resources: resource = problem.resources[res] c = resource.max_capacity if c == -1: print(f'No capacity: {resource}') c = resource_to_sum_of_demand_max[res] # RIP problems have only renewable resources, and no makespan. if problem.is_resource_investment or resource.renewable: intervals = [task_intervals[t] for t in all_active_tasks] demands = [ task_to_resource_demands[t][res] for t in all_active_tasks ] if problem.is_resource_investment: capacity = model.NewIntVar(0, c, f'capacity_of_{res}') model.AddCumulative(intervals, demands, capacity) capacities.append(capacity) max_cost += c * resource.unit_cost else: # Standard renewable resource. if _USE_INTERVAL_MAKESPAN.value: intervals.append(interval_makespan) demands.append(c) model.AddCumulative(intervals, demands, c) else: # Non empty non renewable resource. (single mode only) if problem.is_consumer_producer: reservoir_starts = [] reservoir_demands = [] for t in all_active_tasks: if task_resource_to_fixed_demands[(t, res)][0]: reservoir_starts.append(task_starts[t]) reservoir_demands.append( task_resource_to_fixed_demands[(t, res)][0]) model.AddReservoirConstraint(reservoir_starts, reservoir_demands, resource.min_capacity, resource.max_capacity) else: # No producer-consumer. We just sum the demands. model.Add( cp_model.LinearExpr.Sum([ task_to_resource_demands[t][res] for t in all_active_tasks ]) <= c) # Objective. if problem.is_resource_investment: objective = model.NewIntVar(0, max_cost, 'capacity_costs') model.Add(objective == sum(problem.resources[i].unit_cost * capacities[i] for i in range(len(capacities)))) else: objective = makespan model.Minimize(objective) # Add min delay constraints. if delays is not None: for (local_start, local_end), (min_delay, _) in delays.items(): if local_start == source and local_end in active_tasks: model.Add(task_starts[local_end] >= min_delay) elif local_start in active_tasks and local_end == sink: model.Add(makespan >= task_ends[local_start] + min_delay) elif local_start in active_tasks and local_end in active_tasks: model.Add(task_starts[local_end] >= task_ends[local_start] + min_delay) problem_is_single_mode = True for t in all_active_tasks: if len(task_to_presence_literals[t]) > 1: problem_is_single_mode = False break # Add sentinels. task_starts[source] = 0 task_ends[source] = 0 task_to_presence_literals[0].append(True) task_starts[sink] = makespan task_to_presence_literals[sink].append(True) # For multi-mode problems, add a redundant energetic constraint: # for every (start, end, in_between_tasks) extracted from the precedence # graph, it add the energetic relaxation: # (start_var('end') - end_var('start')) * capacity_max >= # sum of linearized energies of all tasks from 'in_between_tasks' if (not problem.is_resource_investment and not problem.is_consumer_producer and _ADD_REDUNDANT_ENERGETIC_CONSTRAINTS.value and in_main_solve and not problem_is_single_mode): added_constraints = 0 ignored_constraits = 0 for local_start, local_end, common in intervals_of_tasks: for res in all_resources: resource = problem.resources[res] if not resource.renewable: continue c = resource.max_capacity if delays and (local_start, local_end) in delays: min_delay, _ = delays[local_start, local_end] sum_of_max_energies = sum( task_resource_to_max_energy[(t, res)] for t in common) if sum_of_max_energies <= c * min_delay: ignored_constraits += 1 continue model.Add( c * (task_starts[local_end] - task_ends[local_start]) >= sum( task_resource_to_energy[(t, res)] for t in common)) added_constraints += 1 print( f'Added {added_constraints} redundant energetic constraints, and ' + f'ignored {ignored_constraits} constraints.', flush=True) # Add solution hint. if initial_solution: for t in all_active_tasks: model.AddHint(task_starts[t], initial_solution.start_of_task[t]) if len(task_to_presence_literals[t]) > 1: selected = initial_solution.selected_recipe_of_task[t] model.AddHint(task_to_presence_literals[t][selected], 1) # Write model to file. if proto_file: print(f'Writing proto to{proto_file}') model.ExportToFile(proto_file) # Solve model. solver = cp_model.CpSolver() if params: text_format.Parse(params, solver.parameters) if in_main_solve: solver.parameters.log_search_progress = True status = solver.Solve(model) if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE: assignment = rcpsp_pb2.RcpspAssignment() for t in range(len(problem.tasks)): if t in task_starts: assignment.start_of_task.append(solver.Value(task_starts[t])) for r in range(len(task_to_presence_literals[t])): if solver.BooleanValue(task_to_presence_literals[t][r]): assignment.selected_recipe_of_task.append(r) break else: # t is not an active task. assignment.start_of_task.append(0) assignment.selected_recipe_of_task.append(0) return (int(solver.BestObjectiveBound()), int(solver.ObjectiveValue()), assignment) return -1, -1, None def ComputeDelaysBetweenNodes(problem, task_intervals): """Computes the min delays between all pairs of tasks in 'task_intervals'. Args: problem: The protobuf of the model. task_intervals: The output of the AnalysePrecedenceGraph(). Returns: a list of (task1, task2, min_delay_between_task1_and_task2) """ print('Computing the minimum delay between pairs of intervals') delays = {} if (problem.is_resource_investment or problem.is_consumer_producer or problem.is_rcpsp_max or _DELAY_TIME_LIMIT.value <= 0.0): return delays, None, False complete_problem_assignment = None num_optimal_delays = 0 num_delays_not_found = 0 optimal_found = True for start_task, end_task, active_tasks in task_intervals: min_delay, feasible_delay, assignment = SolveRcpsp( problem, '', f'num_search_workers:16,max_time_in_seconds:{_DELAY_TIME_LIMIT.value}', active_tasks, start_task, end_task, [], delays) if min_delay != -1: delays[(start_task, end_task)] = min_delay, feasible_delay if start_task == 0 and end_task == len(problem.tasks) - 1: complete_problem_assignment = assignment if min_delay == feasible_delay: num_optimal_delays += 1 else: optimal_found = False else: num_delays_not_found += 1 optimal_found = False print(f' - #optimal delays = {num_optimal_delays}', flush=True) if num_delays_not_found: print(f' - #not computed delays = {num_delays_not_found}', flush=True) return delays, complete_problem_assignment, optimal_found def AcceptNewCandidate(problem, after, demand_map, current, candidate): """Check if candidate is compatible with the tasks in current.""" for c in current: if candidate in after[c] or c in after[candidate]: return False all_resources = range(len(problem.resources)) for res in all_resources: resource = problem.resources[res] if not resource.renewable: continue if (sum(demand_map[(t, res)] for t in current) + demand_map[(candidate, res)] > resource.max_capacity): return False return True def ComputePreemptiveLowerBound(problem, after, lower_bound): """Computes a preemtive lower bound for the makespan statically. For this, it breaks all intervals into a set of intervals of size one. Then it will try to assign all of them in a minimum number of configurations. This is a standard complete set covering using column generation approach where each column is a possible combination of itervals of size one. Args: problem: The probuf of the model. after: a task to list of task dict that contains all tasks after a given task. lower_bound: A valid lower bound of the problem. It can be 0. Returns: a valid lower bound of the problem. """ # Check this is a single mode problem. if (problem.is_rcpsp_max or problem.is_resource_investment or problem.is_consumer_producer): return lower_bound demand_map = collections.defaultdict(int) duration_map = {} all_active_tasks = list(range(1, len(problem.tasks) - 1)) max_duration = 0 sum_of_demands = 0 for t in all_active_tasks: task = problem.tasks[t] if len(task.recipes) > 1: return 0 recipe = task.recipes[0] duration_map[t] = recipe.duration for demand, resource in zip(recipe.demands, recipe.resources): demand_map[(t, resource)] = demand max_duration = max(max_duration, recipe.duration) sum_of_demands += demand print(f'Compute a bin-packing lower bound with {len(all_active_tasks)}' + ' active tasks', flush=True) all_combinations = [] for t in all_active_tasks: new_combinations = [[t]] for c in all_combinations: if AcceptNewCandidate(problem, after, demand_map, c, t): new_combinations.append(c + [t]) all_combinations.extend(new_combinations) print(f' - created {len(all_combinations)} combinations') if len(all_combinations) > 5000000: return lower_bound # Abort if too large. # Solve the selection model. # TODO(user): a few possible improvements: # 1/ use "dominating" columns, i.e. if you can add a task to a column, then # do not use that column. # 2/ Merge all task with exactly same demands into one. model = cp_model.CpModel() model.SetName(f'lower_bound_{problem.name}') vars_per_task = collections.defaultdict(list) all_vars = [] for c in all_combinations: min_duration = max_duration for t in c: min_duration = min(min_duration, duration_map[t]) count = model.NewIntVar(0, min_duration, f'count_{t}') all_vars.append(count) for t in c: vars_per_task[t].append(count) # Each task must be performed. for t in all_active_tasks: model.Add(sum(vars_per_task[t]) >= duration_map[t]) # Objective objective_var = model.NewIntVar(lower_bound, sum_of_demands, 'objective_var') model.Add(objective_var == sum(all_vars)) model.Minimize(objective_var) # Solve model. solver = cp_model.CpSolver() solver.parameters.num_search_workers = 16 solver.parameters.max_time_in_seconds = _PREEMPTIVE_LB_TIME_LIMIT.value status = solver.Solve(model) if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE: status_str = 'optimal' if status == cp_model.OPTIMAL else '' lower_bound = max(lower_bound, int(solver.BestObjectiveBound())) print(f' - {status_str} static lower bound = {lower_bound}', flush=True) return lower_bound def main(_): rcpsp_parser = pywraprcpsp.RcpspParser() rcpsp_parser.ParseFile(_INPUT.value) problem = rcpsp_parser.Problem() PrintProblemStatistics(problem) intervals_of_tasks, after = AnalyseDependencyGraph(problem) delays, initial_solution, optimal_found = ComputeDelaysBetweenNodes( problem, intervals_of_tasks) last_task = len(problem.tasks) - 1 key = (0, last_task) lower_bound = delays[key][0] if key in delays else 0 if not optimal_found and _PREEMPTIVE_LB_TIME_LIMIT.value > 0.0: lower_bound = ComputePreemptiveLowerBound(problem, after, lower_bound) SolveRcpsp(problem=problem, proto_file=_OUTPUT_PROTO.value, params=_PARAMS.value, active_tasks=set(range(1, last_task)), source=0, sink=last_task, intervals_of_tasks=intervals_of_tasks, delays=delays, in_main_solve=True, initial_solution=initial_solution, lower_bound=lower_bound) if __name__ == '__main__': app.run(main)