Files
ortools-clone/examples/python/line_balancing_sat.py

363 lines
11 KiB
Python
Raw Permalink Normal View History

2022-07-08 14:49:51 +02:00
#!/usr/bin/env python3
2025-01-10 11:35:44 +01:00
# Copyright 2010-2025 Google LLC
2022-07-08 14:49:51 +02:00
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2023-07-01 06:06:53 +02:00
2022-07-08 14:49:51 +02:00
"""Reader and solver of the single assembly line balancing problem.
from https://assembly-line-balancing.de/salbp/:
The simple assembly line balancing problem (SALBP) is the basic optimization
problem in assembly line balancing research. Given is a set of tasks each of
which has a deterministic task time. The tasks are partially ordered by
precedence relations defining a precedence graph as depicted below.
It reads .alb files:
https://assembly-line-balancing.de/wp-content/uploads/2017/01/format-ALB.pdf
and solves the corresponding problem.
"""
import collections
import re
from typing import Dict, Sequence
2022-07-08 14:49:51 +02:00
from absl import app
from absl import flags
2025-07-23 17:38:49 +02:00
2022-07-08 14:49:51 +02:00
from ortools.sat.python import cp_model
2023-07-01 06:06:53 +02:00
_INPUT = flags.DEFINE_string("input", "", "Input file to parse and solve.")
_PARAMS = flags.DEFINE_string("params", "", "Sat solver parameters.")
2022-10-07 18:21:23 +02:00
_OUTPUT_PROTO = flags.DEFINE_string(
2023-07-01 06:06:53 +02:00
"output_proto", "", "Output file to write the cp_model proto to."
)
_MODEL = flags.DEFINE_string(
"model", "boolean", "Model used: boolean, scheduling, greedy"
)
2022-07-08 14:49:51 +02:00
2024-07-22 11:30:12 +02:00
class SectionInfo:
"""Store problem information for each section of the input file."""
2022-07-08 14:49:51 +02:00
def __init__(self):
self.value = None
self.index_map = {}
self.set_of_pairs = set()
def __str__(self):
if self.index_map:
2023-07-01 06:06:53 +02:00
return f"SectionInfo(index_map={self.index_map})"
2022-07-08 14:49:51 +02:00
elif self.set_of_pairs:
2023-07-01 06:06:53 +02:00
return f"SectionInfo(set_of_pairs={self.set_of_pairs})"
2022-07-08 14:49:51 +02:00
elif self.value is not None:
2023-07-01 06:06:53 +02:00
return f"SectionInfo(value={self.value})"
2022-07-08 14:49:51 +02:00
else:
2023-07-01 06:06:53 +02:00
return "SectionInfo()"
2022-07-08 14:49:51 +02:00
def read_problem(filename: str) -> Dict[str, SectionInfo]:
"""Reads a .alb file and returns the problem."""
2022-07-08 14:49:51 +02:00
current_info = SectionInfo()
problem: Dict[str, SectionInfo] = {}
2023-07-01 06:06:53 +02:00
with open(filename, "r") as input_file:
print(f"Reading problem from '{filename}'")
2022-07-08 14:49:51 +02:00
for line in input_file:
stripped_line = line.strip()
if not stripped_line:
continue
match_section_def = re.fullmatch(r"<([\w\s]+)>", stripped_line)
2022-07-08 14:49:51 +02:00
if match_section_def:
section_name = match_section_def.group(1)
2023-07-01 06:06:53 +02:00
if section_name == "end":
2022-07-08 14:49:51 +02:00
continue
current_info = SectionInfo()
problem[section_name] = current_info
2022-07-08 14:49:51 +02:00
continue
match_single_number = re.fullmatch(r"^([0-9]+)$", stripped_line)
2022-07-08 14:49:51 +02:00
if match_single_number:
current_info.value = int(match_single_number.group(1))
continue
match_key_value = re.fullmatch(r"^([0-9]+)\s+([0-9]+)$", stripped_line)
2022-07-08 14:49:51 +02:00
if match_key_value:
key = int(match_key_value.group(1))
value = int(match_key_value.group(2))
current_info.index_map[key] = value
continue
match_pair = re.fullmatch(r"^([0-9]+),([0-9]+)$", stripped_line)
2022-07-08 14:49:51 +02:00
if match_pair:
left = int(match_pair.group(1))
right = int(match_pair.group(2))
current_info.set_of_pairs.add((left, right))
continue
2023-07-01 06:06:53 +02:00
print(f"Unrecognized line '{stripped_line}'")
2022-07-08 14:49:51 +02:00
return problem
2022-07-08 14:49:51 +02:00
def print_stats(problem: Dict[str, SectionInfo]) -> None:
print("Problem Statistics")
for key, value in problem.items():
2023-07-01 06:06:53 +02:00
print(f" - {key}: {value}")
2022-07-08 14:49:51 +02:00
def solve_problem_greedily(problem: Dict[str, SectionInfo]) -> Dict[int, int]:
2022-07-08 14:49:51 +02:00
"""Compute a greedy solution."""
2023-07-01 06:06:53 +02:00
print("Solving using a Greedy heuristics")
2022-07-08 14:49:51 +02:00
num_tasks = problem["number of tasks"].value
if num_tasks is None:
return {}
2022-07-08 14:49:51 +02:00
all_tasks = range(1, num_tasks + 1) # Tasks are 1 based in the data.
precedences = problem["precedence relations"].set_of_pairs
durations = problem["task times"].index_map
cycle_time = problem["cycle time"].value
2022-07-08 14:49:51 +02:00
weights = collections.defaultdict(int)
successors = collections.defaultdict(list)
candidates = set(all_tasks)
for before, after in precedences:
weights[after] += 1
successors[before].append(after)
if after in candidates:
candidates.remove(after)
assignment: Dict[int, int] = {}
2022-07-08 14:49:51 +02:00
current_pod = 0
residual_capacity = cycle_time
while len(assignment) < num_tasks:
if not candidates:
2023-07-01 06:06:53 +02:00
print("error empty")
2022-07-08 14:49:51 +02:00
break
best = -1
best_slack = cycle_time
best_duration = 0
for c in candidates:
duration = durations[c]
slack = residual_capacity - duration
if slack < best_slack and slack >= 0:
best_slack = slack
best = c
best_duration = duration
if best == -1:
current_pod += 1
residual_capacity = cycle_time
continue
candidates.remove(best)
assignment[best] = current_pod
residual_capacity -= best_duration
for succ in successors[best]:
weights[succ] -= 1
if weights[succ] == 0:
candidates.add(succ)
del weights[succ]
2023-07-01 06:06:53 +02:00
print(f" greedy solution uses {current_pod + 1} pods.")
2022-07-08 14:49:51 +02:00
return assignment
def solve_problem_with_boolean_model(
problem: Dict[str, SectionInfo], hint: Dict[int, int]
) -> None:
"""solve the given problem."""
2022-07-08 14:49:51 +02:00
2023-07-01 06:06:53 +02:00
print("Solving using the Boolean model")
# problem data
num_tasks = problem["number of tasks"].value
if num_tasks is None:
return
all_tasks = range(1, num_tasks + 1) # Tasks are 1 based in the problem.
durations = problem["task times"].index_map
precedences = problem["precedence relations"].set_of_pairs
cycle_time = problem["cycle time"].value
2022-07-08 14:49:51 +02:00
num_pods = max(p for _, p in hint.items()) + 1 if hint else num_tasks - 1
all_pods = range(num_pods)
model = cp_model.CpModel()
# assign[t, p] indicates if task t is done on pod p.
assign = {}
# possible[t, p] indicates if task t is possible on pod p.
possible = {}
# Create the variables
for t in all_tasks:
for p in all_pods:
2023-11-16 19:46:56 +01:00
assign[t, p] = model.new_bool_var(f"assign_{t}_{p}")
possible[t, p] = model.new_bool_var(f"possible_{t}_{p}")
2022-07-08 14:49:51 +02:00
# active[p] indicates if pod p is active.
2023-11-16 19:46:56 +01:00
active = [model.new_bool_var(f"active_{p}") for p in all_pods]
2022-07-08 14:49:51 +02:00
# Each task is done on exactly one pod.
for t in all_tasks:
2023-11-16 19:46:56 +01:00
model.add_exactly_one([assign[t, p] for p in all_pods])
2022-07-08 14:49:51 +02:00
# Total tasks assigned to one pod cannot exceed cycle time.
for p in all_pods:
2023-11-16 19:46:56 +01:00
model.add(sum(assign[t, p] * durations[t] for t in all_tasks) <= cycle_time)
2022-07-08 14:49:51 +02:00
# Maintain the possible variables:
# possible at pod p -> possible at any pod after p
for t in all_tasks:
for p in range(num_pods - 1):
2023-11-16 19:46:56 +01:00
model.add_implication(possible[t, p], possible[t, p + 1])
2022-07-08 14:49:51 +02:00
# Link possible and active variables.
for t in all_tasks:
for p in all_pods:
2023-11-16 19:46:56 +01:00
model.add_implication(assign[t, p], possible[t, p])
2022-07-08 14:49:51 +02:00
if p > 1:
model.add_implication(assign[t, p], ~possible[t, p - 1])
2022-07-08 14:49:51 +02:00
# Precedences.
for before, after in precedences:
for p in range(1, num_pods):
model.add_implication(assign[before, p], ~possible[after, p - 1])
2022-07-08 14:49:51 +02:00
# Link active variables with the assign one.
for p in all_pods:
all_assign_vars = [assign[t, p] for t in all_tasks]
for a in all_assign_vars:
2023-11-16 19:46:56 +01:00
model.add_implication(a, active[p])
model.add_bool_or(all_assign_vars + [~active[p]])
2022-07-08 14:49:51 +02:00
# Force pods to be contiguous. This is critical to get good lower bounds
# on the objective, even if it makes feasibility harder.
for p in range(1, num_pods):
model.add_implication(~active[p - 1], ~active[p])
2022-07-08 14:49:51 +02:00
for t in all_tasks:
model.add_implication(~active[p], possible[t, p - 1])
2022-07-08 14:49:51 +02:00
# Objective.
2023-11-16 19:46:56 +01:00
model.minimize(sum(active))
2022-07-08 14:49:51 +02:00
2023-11-16 19:46:56 +01:00
# add search hinting from the greedy solution.
2022-07-08 14:49:51 +02:00
for t in all_tasks:
2023-11-16 19:46:56 +01:00
model.add_hint(assign[t, hint[t]], 1)
2022-07-08 14:49:51 +02:00
if _OUTPUT_PROTO.value:
2023-07-01 06:06:53 +02:00
print(f"Writing proto to {_OUTPUT_PROTO.value}")
2023-11-16 19:46:56 +01:00
model.export_to_file(_OUTPUT_PROTO.value)
2022-07-08 14:49:51 +02:00
2023-11-16 19:46:56 +01:00
# solve model.
2022-07-08 14:49:51 +02:00
solver = cp_model.CpSolver()
if _PARAMS.value:
2025-07-23 17:38:49 +02:00
solver.parameters.parse_text_format(_PARAMS.value)
2022-07-08 14:49:51 +02:00
solver.parameters.log_search_progress = True
2023-11-16 19:46:56 +01:00
solver.solve(model)
2022-07-08 14:49:51 +02:00
def solve_problem_with_scheduling_model(
problem: Dict[str, SectionInfo], hint: Dict[int, int]
) -> None:
"""solve the given problem using a cumulative model."""
2022-07-08 14:49:51 +02:00
2023-07-01 06:06:53 +02:00
print("Solving using the scheduling model")
# Problem data
num_tasks = problem["number of tasks"].value
if num_tasks is None:
return
2022-07-08 14:49:51 +02:00
all_tasks = range(1, num_tasks + 1) # Tasks are 1 based in the data.
durations = problem["task times"].index_map
precedences = problem["precedence relations"].set_of_pairs
cycle_time = problem["cycle time"].value
2022-07-08 14:49:51 +02:00
num_pods = max(p for _, p in hint.items()) + 1 if hint else num_tasks
model = cp_model.CpModel()
# pod[t] indicates on which pod the task is performed.
pods = {}
for t in all_tasks:
2023-11-16 19:46:56 +01:00
pods[t] = model.new_int_var(0, num_pods - 1, f"pod_{t}")
2022-07-08 14:49:51 +02:00
# Create the variables
intervals = []
demands = []
for t in all_tasks:
2023-11-16 19:46:56 +01:00
interval = model.new_fixed_size_interval_var(pods[t], 1, "")
2022-07-08 14:49:51 +02:00
intervals.append(interval)
demands.append(durations[t])
2023-11-16 19:46:56 +01:00
# add terminating interval as the objective.
obj_var = model.new_int_var(1, num_pods, "obj_var")
obj_size = model.new_int_var(1, num_pods, "obj_duration")
obj_interval = model.new_interval_var(
obj_var, obj_size, num_pods + 1, "obj_interval"
)
2022-07-08 14:49:51 +02:00
intervals.append(obj_interval)
demands.append(cycle_time)
# Cumulative constraint.
2023-11-16 19:46:56 +01:00
model.add_cumulative(intervals, demands, cycle_time)
2022-07-08 14:49:51 +02:00
# Precedences.
for before, after in precedences:
2023-11-16 19:46:56 +01:00
model.add(pods[after] >= pods[before])
2022-07-08 14:49:51 +02:00
# Objective.
2023-11-16 19:46:56 +01:00
model.minimize(obj_var)
2022-07-08 14:49:51 +02:00
2023-11-16 19:46:56 +01:00
# add search hinting from the greedy solution.
2022-07-08 14:49:51 +02:00
for t in all_tasks:
2023-11-16 19:46:56 +01:00
model.add_hint(pods[t], hint[t])
2022-07-08 14:49:51 +02:00
if _OUTPUT_PROTO.value:
2023-07-01 06:06:53 +02:00
print(f"Writing proto to{_OUTPUT_PROTO.value}")
2023-11-16 19:46:56 +01:00
model.export_to_file(_OUTPUT_PROTO.value)
2022-07-08 14:49:51 +02:00
2023-11-16 19:46:56 +01:00
# solve model.
2022-07-08 14:49:51 +02:00
solver = cp_model.CpSolver()
if _PARAMS.value:
2025-07-23 17:38:49 +02:00
solver.parameters.parse_text_format(_PARAMS.value)
2022-07-08 14:49:51 +02:00
solver.parameters.log_search_progress = True
2023-11-16 19:46:56 +01:00
solver.solve(model)
2022-07-08 14:49:51 +02:00
def main(argv: Sequence[str]) -> None:
if len(argv) > 1:
2023-07-01 06:06:53 +02:00
raise app.UsageError("Too many command-line arguments.")
2022-07-08 14:49:51 +02:00
problem = read_problem(_INPUT.value)
print_stats(problem)
greedy_solution = solve_problem_greedily(problem)
2022-07-08 14:49:51 +02:00
2023-07-01 06:06:53 +02:00
if _MODEL.value == "boolean":
solve_problem_with_boolean_model(problem, greedy_solution)
2023-07-01 06:06:53 +02:00
elif _MODEL.value == "scheduling":
solve_problem_with_scheduling_model(problem, greedy_solution)
2022-07-08 14:49:51 +02:00
2023-07-01 06:06:53 +02:00
if __name__ == "__main__":
2022-07-08 14:49:51 +02:00
app.run(main)