polish python examples: use string interpolations; remove many mypy warnings

This commit is contained in:
Laurent Perron
2024-07-25 15:46:57 -07:00
parent ad3e00e441
commit 754d6a4d0f
21 changed files with 285 additions and 300 deletions

View File

@@ -25,7 +25,6 @@ from google.protobuf import text_format
from ortools.linear_solver.python import model_builder as mb
from ortools.sat.python import cp_model
FLAGS = flags.FLAGS
_OUTPUT_PROTO = flags.DEFINE_string(
"output_proto", "", "Output file to write the cp_model proto to."
@@ -225,7 +224,6 @@ def create_state_graph(items, max_capacity):
new_state = current_state + size * (card + 1)
if new_state > max_capacity:
break
new_state_index = -1
if new_state in state_to_index:
new_state_index = state_to_index[new_state]
else:

View File

@@ -42,24 +42,24 @@ def solve_assignment():
[0, 1, 0, 1], # Workers 1, 3
[0, 1, 1, 0], # Workers 1, 2
[1, 1, 0, 0], # Workers 0, 1
[1, 0, 1, 0],
] # Workers 0, 2
[1, 0, 1, 0], # Workers 0, 2
]
group2 = [
[0, 0, 1, 1], # Workers 6, 7
[0, 1, 0, 1], # Workers 5, 7
[0, 1, 1, 0], # Workers 5, 6
[1, 1, 0, 0], # Workers 4, 5
[1, 0, 0, 1],
] # Workers 4, 7
[1, 0, 0, 1], # Workers 4, 7
]
group3 = [
[0, 0, 1, 1], # Workers 10, 11
[0, 1, 0, 1], # Workers 9, 11
[0, 1, 1, 0], # Workers 9, 10
[1, 0, 1, 0], # Workers 8, 10
[1, 0, 0, 1],
] # Workers 8, 11
[1, 0, 0, 1], # Workers 8, 11
]
sizes = [10, 7, 3, 12, 15, 4, 11, 5]
total_size_max = 15
@@ -73,10 +73,9 @@ def solve_assignment():
model = cp_model.CpModel()
# Variables
selected = [
[model.new_bool_var("x[%i,%i]" % (i, j)) for j in all_tasks]
for i in all_workers
[model.new_bool_var(f"x[{i},{j}]") for j in all_tasks] for i in all_workers
]
works = [model.new_bool_var("works[%i]" % i) for i in all_workers]
works = [model.new_bool_var(f"works[{i}]") for i in all_workers]
# Constraints
@@ -107,21 +106,16 @@ def solve_assignment():
status = solver.solve(model)
if status == cp_model.OPTIMAL:
print("Total cost = %i" % solver.objective_value)
print(f"Total cost = {solver.objective_value}")
print()
for i in all_workers:
for j in all_tasks:
if solver.boolean_value(selected[i][j]):
print(
"Worker ", i, " assigned to task ", j, " Cost = ", cost[i][j]
)
print(f"Worker {i} assigned to task {j} with Cost = {cost[i][j]}")
print()
print("Statistics")
print(" - conflicts : %i" % solver.num_conflicts)
print(" - branches : %i" % solver.num_branches)
print(" - wall time : %f s" % solver.wall_time)
print(solver.response_stats())
def main(argv: Sequence[str]) -> None:

View File

@@ -40,10 +40,10 @@ class SolutionPrinter(cp_model.CpSolverSolutionCallback):
self.__item_in_group = item_in_group
def on_solution_callback(self):
print("Solution %i" % self.__solution_count)
print(f"Solution {self.__solution_count}")
self.__solution_count += 1
print(" objective value = %i" % self.objective_value)
print(f" objective value = {self.objective_value}")
groups = {}
sums = {}
for g in self.__all_groups:
@@ -56,11 +56,11 @@ class SolutionPrinter(cp_model.CpSolverSolutionCallback):
for g in self.__all_groups:
group = groups[g]
print("group %i: sum = %0.2f [" % (g, sums[g]), end="")
print(f"group {g}: sum = {sums[g]:0.2f} [", end="")
for item in group:
value = self.__values[item]
color = self.__colors[item]
print(" (%i, %i, %i)" % (item, value, color), end="")
print(f" ({item}, {value}, {color})", end="")
print("]")
@@ -97,7 +97,9 @@ def main(argv: Sequence[str]) -> None:
if colors[i] == color:
items_per_color[color].append(i)
print(f"Model has {num_items} items, {num_groups} groups, and {num_colors} colors")
print(
f"Model has {num_items} items, {num_groups} groups, and" f" {num_colors} colors"
)
print(f" average sum per group = {average_sum_per_group}")
# Model.
@@ -107,7 +109,7 @@ def main(argv: Sequence[str]) -> None:
item_in_group = {}
for i in all_items:
for g in all_groups:
item_in_group[(i, g)] = model.new_bool_var("item %d in group %d" % (i, g))
item_in_group[(i, g)] = model.new_bool_var(f"item {i} in group {g}")
# Each group must have the same size.
for g in all_groups:
@@ -135,9 +137,7 @@ def main(argv: Sequence[str]) -> None:
color_in_group = {}
for g in all_groups:
for c in all_colors:
color_in_group[(c, g)] = model.new_bool_var(
"color %d is in group %d" % (c, g)
)
color_in_group[(c, g)] = model.new_bool_var(f"color {c} is in group {g}")
# Item is in a group implies its color is in that group.
for i in all_items:
@@ -174,11 +174,8 @@ def main(argv: Sequence[str]) -> None:
status = solver.solve(model, solution_printer)
if status == cp_model.OPTIMAL:
print("Optimal epsilon: %i" % solver.objective_value)
print("Statistics")
print(" - conflicts : %i" % solver.num_conflicts)
print(" - branches : %i" % solver.num_branches)
print(" - wall time : %f s" % solver.wall_time)
print(f"Optimal epsilon: {solver.objective_value}")
print(solver.response_stats())
else:
print("No solution found")

View File

@@ -89,24 +89,24 @@ def chemical_balance():
# Creates a solver and solves.
solver = cp_model.CpSolver()
status = solver.solve(model)
print(f"Status = {solver.status_name(status)}")
# The objective value of the solution.
print(f"Optimal objective value = {solver.objective_value / 10000.0}")
if status == cp_model.OPTIMAL:
# The objective value of the solution.
print(f"Optimal objective value = {solver.objective_value / 10000.0}")
for s in all_sets:
print(
f" {chemical_set[s][0]} = {solver.value(set_vars[s]) / 1000.0}",
end=" ",
)
print()
for p in all_products:
name = max_quantities[p][0]
max_quantity = max_quantities[p][1]
quantity = sum(
solver.value(set_vars[s]) / 1000.0 * chemical_set[s][p + 1]
for s in all_sets
)
print(f"{name}: {quantity} out of {max_quantity}")
for s in all_sets:
print(
f" {chemical_set[s][0]} = {solver.value(set_vars[s]) / 1000.0}",
end=" ",
)
print()
for p in all_products:
name = max_quantities[p][0]
max_quantity = max_quantities[p][1]
quantity = sum(
solver.value(set_vars[s]) / 1000.0 * chemical_set[s][p + 1]
for s in all_sets
)
print(f"{name}: {quantity:.3f} out of {max_quantity}")
def main(argv: Sequence[str]) -> None:

View File

@@ -33,15 +33,15 @@ from ortools.sat.python import cp_model
class SolutionPrinter(cp_model.CpSolverSolutionCallback):
"""Print intermediate solutions."""
def __init__(self):
def __init__(self) -> None:
cp_model.CpSolverSolutionCallback.__init__(self)
self.__solution_count = 0
def on_solution_callback(self):
def on_solution_callback(self) -> None:
"""Called at each new solution."""
print(
"Solution %i, time = %f s, objective = %i"
% (self.__solution_count, self.wall_time, self.objective_value)
f"Solution {self.__solution_count}, time = {self.wall_time} s,"
f" objective = {self.objective_value}"
)
self.__solution_count += 1
@@ -84,7 +84,7 @@ def flexible_jobshop() -> None:
max_task_duration = max(max_task_duration, alternative[0])
horizon += max_task_duration
print("Horizon = %i" % horizon)
print(f"Horizon = {horizon}")
# Global storage of variables.
intervals_per_resources = collections.defaultdict(list)
@@ -112,7 +112,7 @@ def flexible_jobshop() -> None:
max_duration = max(max_duration, alt_duration)
# Create main interval for the task.
suffix_name = "_j%i_t%i" % (job_id, task_id)
suffix_name = f"_j{job_id}_t{task_id}"
start = model.new_int_var(0, horizon, "start" + suffix_name)
duration = model.new_int_var(
min_duration, max_duration, "duration" + suffix_name
@@ -134,7 +134,7 @@ def flexible_jobshop() -> None:
if num_alternatives > 1:
l_presences = []
for alt_id in all_alternatives:
alt_suffix = "_j%i_t%i_a%i" % (job_id, task_id, alt_id)
alt_suffix = f"_j{job_id}_t{task_id}_a{alt_id}"
l_presence = model.new_bool_var("presence" + alt_suffix)
l_start = model.new_int_var(0, horizon, "start" + alt_suffix)
l_duration = task[alt_id][0]
@@ -181,28 +181,25 @@ def flexible_jobshop() -> None:
status = solver.solve(model, solution_printer)
# Print final solution.
for job_id in all_jobs:
print(f"Job {job_id}")
for task_id in range(len(jobs[job_id])):
start_value = solver.value(starts[(job_id, task_id)])
machine: int = -1
task_duration: int = -1
selected: int = -1
for alt_id in range(len(jobs[job_id][task_id])):
if solver.boolean_value(presences[(job_id, task_id, alt_id)]):
task_duration = jobs[job_id][task_id][alt_id][0]
machine = jobs[job_id][task_id][alt_id][1]
selected = alt_id
print(
f" task_{job_id}_{task_id} starts at {start_value} (alt {selected}, machine {machine}, duration {task_duration})"
)
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
print(f"Optimal objective value: {solver.objective_value}")
for job_id in all_jobs:
print(f"Job {job_id}")
for task_id, task in enumerate(jobs[job_id]):
start_value = solver.value(starts[(job_id, task_id)])
machine: int = -1
task_duration: int = -1
selected: int = -1
for alt_id, alt in enumerate(task):
if solver.boolean_value(presences[(job_id, task_id, alt_id)]):
task_duration, machine = alt
selected = alt_id
print(
f" task_{job_id}_{task_id} starts at {start_value} (alt"
f" {selected}, machine {machine}, duration {task_duration})"
)
print(f"solve status: {solver.status_name(status)}")
print(f"Optimal objective value: {solver.objective_value}")
print("Statistics")
print(f" - conflicts : {solver.num_conflicts}")
print(f" - branches : {solver.num_branches}")
print(f" - wall time : {solver.wall_time} s")
print(solver.response_stats())
flexible_jobshop()

View File

@@ -154,12 +154,10 @@ def main(_) -> None:
performed_machine = 1 - solver.value(performed[i])
start_of_task = solver.value(starts[i])
print(
f" - Job {i} starts at {start_of_task} on machine {performed_machine}"
f" - Job {i} starts at {start_of_task} on machine"
f" {performed_machine}"
)
print("Statistics")
print(f" - conflicts : {solver.num_conflicts}")
print(f" - branches : {solver.num_branches}")
print(f" - wall time : {solver.wall_time} s")
print(solver.response_stats())
if __name__ == "__main__":

View File

@@ -76,18 +76,13 @@ def solve_golomb_ruler(order: int, params: str) -> None:
status = solver.solve(model, solution_printer)
# Print solution.
print(f"status: {solver.status_name(status)}")
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
for idx, var in enumerate(marks):
print(f"mark[{idx}]: {solver.value(var)}")
intervals = [solver.value(diff) for diff in diffs]
intervals.sort()
print(f"intervals: {intervals}")
print("Statistics:")
print(f"- conflicts: {solver.num_conflicts}")
print(f"- branches : {solver.num_branches}")
print(f"- wall time: {solver.wall_time}s\n")
print(solver.response_stats())
def main(argv: Sequence[str]) -> None:

View File

@@ -74,7 +74,7 @@ def print_matrix(game: list[list[int]]) -> None:
if game[i][j] == 0:
line += " ."
else:
line += "% 3s" % game[i][j]
line += f"{game[i][j]:3}"
print(line)
@@ -102,7 +102,7 @@ def build_puzzle(problem: int) -> Union[None, list[list[int]]]:
elif problem == 3:
# Problems from the book:
# Gyora Bededek: "Hidato: 2000 Pure Logic Puzzles"
# Gyora Bededek: 'Hidato: 2000 Pure Logic Puzzles'
# Problem 1 (Practice)
puzzle = [
[0, 0, 20, 0, 0],
@@ -156,15 +156,15 @@ def solve_hidato(puzzle: list[list[int]], index: int) -> None:
c = len(puzzle[0])
if not visualization.RunFromIPython():
print("")
print("----- Solving problem %i -----" % index)
print(f"----- Solving problem {index} -----")
print("")
print(("Initial game (%i x %i)" % (r, c)))
print(f"Initial game ({r} x {c})")
print_matrix(puzzle)
#
# Declare variables.
#
positions = [model.new_int_var(0, r * c - 1, "p[%i]" % i) for i in range(r * c)]
positions = [model.new_int_var(0, r * c - 1, f"p[{i}]") for i in range(r * c)]
#
# Constraints.
@@ -202,7 +202,7 @@ def solve_hidato(puzzle: list[list[int]], index: int) -> None:
color = "white" if puzzle[y][x] == 0 else "lightgreen"
output.AddRectangle(x, r - y - 1, 1, 1, color, "black", str(i + 1))
output.AddTitle("Puzzle %i solved in %f s" % (index, solver.wall_time))
output.AddTitle(f"Puzzle {index} solved in {solver.wall_time:.2f} s")
output.Display()
else:
print_solution(
@@ -210,10 +210,7 @@ def solve_hidato(puzzle: list[list[int]], index: int) -> None:
r,
c,
)
print("Statistics")
print(" - conflicts : %i" % solver.num_conflicts)
print(" - branches : %i" % solver.num_branches)
print(" - wall time : %f s" % solver.wall_time)
print(solver.response_stats())
def main(_):

View File

@@ -73,11 +73,11 @@ def jobshop_ft06_distance() -> None:
all_tasks = {}
for i in all_jobs:
for j in all_machines:
start_var = model.new_int_var(0, horizon, "start_%i_%i" % (i, j))
start_var = model.new_int_var(0, horizon, f"start_{i}_{j}")
duration = durations[i][j]
end_var = model.new_int_var(0, horizon, "end_%i_%i" % (i, j))
end_var = model.new_int_var(0, horizon, f"end_{i}_{j}")
interval_var = model.new_interval_var(
start_var, duration, end_var, "interval_%i_%i" % (i, j)
start_var, duration, end_var, f"interval_{i}_{j}"
)
all_tasks[(i, j)] = task_type(
start=start_var, end=end_var, interval=interval_var
@@ -101,16 +101,16 @@ def jobshop_ft06_distance() -> None:
arcs = []
for j1 in range(len(job_intervals)):
# Initial arc from the dummy node (0) to a task.
start_lit = model.new_bool_var("%i is first job" % j1)
start_lit = model.new_bool_var(f"{j1} is first job")
arcs.append((0, j1 + 1, start_lit))
# Final arc from an arc to the dummy node.
arcs.append((j1 + 1, 0, model.new_bool_var("%i is last job" % j1)))
arcs.append((j1 + 1, 0, model.new_bool_var(f"{j1} is last job")))
for j2 in range(len(job_intervals)):
if j1 == j2:
continue
lit = model.new_bool_var("%i follows %i" % (j2, j1))
lit = model.new_bool_var(f"{j2} follows {j1}")
arcs.append((j1 + 1, j2 + 1, lit))
# We add the reified precedence to link the literal with the
@@ -140,7 +140,8 @@ def jobshop_ft06_distance() -> None:
# Output solution.
if status == cp_model.OPTIMAL:
print("Optimal makespan: %i" % solver.objective_value)
print(f"Optimal makespan: {solver.objective_value}")
print(solver.response_stats())
jobshop_ft06_distance()

View File

@@ -66,11 +66,11 @@ def jobshop_ft06() -> None:
all_tasks = {}
for i in all_jobs:
for j in all_machines:
start_var = model.new_int_var(0, horizon, "start_%i_%i" % (i, j))
start_var = model.new_int_var(0, horizon, f"start_{i}_{j}")
duration = durations[i][j]
end_var = model.new_int_var(0, horizon, "end_%i_%i" % (i, j))
end_var = model.new_int_var(0, horizon, f"end_{i}_{j}")
interval_var = model.new_interval_var(
start_var, duration, end_var, "interval_%i_%i" % (i, j)
start_var, duration, end_var, f"interval_{i}_{j}"
)
all_tasks[(i, j)] = task_type(
start=start_var, end=end_var, interval=interval_var
@@ -113,7 +113,7 @@ def jobshop_ft06() -> None:
]
visualization.DisplayJobshop(starts, durations, machines, "FT06")
else:
print("Optimal makespan: %i" % solver.objective_value)
print(f"Optimal makespan: {solver.objective_value}")
jobshop_ft06()

View File

@@ -23,15 +23,15 @@ from ortools.sat.python import cp_model
class SolutionPrinter(cp_model.CpSolverSolutionCallback):
"""Print intermediate solutions."""
def __init__(self):
def __init__(self) -> None:
cp_model.CpSolverSolutionCallback.__init__(self)
self.__solution_count = 0
def on_solution_callback(self):
def on_solution_callback(self) -> None:
"""Called at each new solution."""
print(
"Solution %i, time = %f s, objective = %i"
% (self.__solution_count, self.wall_time, self.objective_value)
f"Solution {self.__solution_count}, time = {self.wall_time} s,"
f" objective = {self.objective_value}"
)
self.__solution_count += 1
@@ -67,8 +67,7 @@ def jobshop_with_maintenance() -> None:
for job_id, job in enumerate(jobs_data):
for entry in enumerate(job):
task_id, task = entry
machine = task[0]
duration = task[1]
machine, duration = task
suffix = f"_{job_id}_{task_id}"
start_var = model.new_int_var(0, horizon, "start" + suffix)
end_var = model.new_int_var(0, horizon, "end" + suffix)
@@ -132,15 +131,15 @@ def jobshop_with_maintenance() -> None:
sol_line = " "
for assigned_task in assigned_jobs[machine]:
name = "job_%i_%i" % (assigned_task.job, assigned_task.index)
name = f"job_{assigned_task.job}_{assigned_task.index}"
# add spaces to output to align columns.
sol_line_tasks += "%-10s" % name
sol_line_tasks += f"{name:>10}"
start = assigned_task.start
duration = assigned_task.duration
sol_tmp = "[%i,%i]" % (start, start + duration)
sol_tmp = f"[{start}, {start + duration}]"
# add spaces to output to align columns.
sol_line += "%-10s" % sol_tmp
sol_line += f"{sol_tmp:>10}"
sol_line += "\n"
sol_line_tasks += "\n"
@@ -148,12 +147,9 @@ def jobshop_with_maintenance() -> None:
output += sol_line
# Finally print the solution found.
print("Optimal Schedule Length: %i" % solver.objective_value)
print(f"Optimal Schedule Length: {solver.objective_value}")
print(output)
print("Statistics")
print(" - conflicts : %i" % solver.num_conflicts)
print(" - branches : %i" % solver.num_branches)
print(" - wall time : %f s" % solver.wall_time)
print(solver.response_stats())
def main(argv: Sequence[str]) -> None:

View File

@@ -164,7 +164,7 @@ def solve_with_duplicate_items(
status = solver.solve(model)
# Report solution.
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
used = {i for i in range(num_items) if solver.boolean_value(is_used[i])}
data = pd.DataFrame(
{
@@ -266,7 +266,7 @@ def solve_with_duplicate_optional_items(
status = solver.solve(model)
# Report solution.
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
used = {i for i in range(num_items) if solver.boolean_value(is_used[i])}
data = pd.DataFrame(
{
@@ -387,7 +387,7 @@ def solve_with_rotations(data: pd.Series, max_height: int, max_width: int):
status = solver.solve(model)
# Report solution.
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
used = {i for i in range(num_items) if solver.boolean_value(is_used[i])}
data = pd.DataFrame(
{

View File

@@ -29,7 +29,7 @@ and solves the corresponding problem.
import collections
import re
from typing import Sequence
from typing import Dict, Sequence
from absl import app
from absl import flags
@@ -48,7 +48,7 @@ _MODEL = flags.DEFINE_string(
class SectionInfo:
"""Store model information for each section of the input file."""
"""Store problem information for each section of the input file."""
def __init__(self):
self.value = None
@@ -66,44 +66,43 @@ class SectionInfo:
return "SectionInfo()"
def read_model(filename):
"""Reads a .alb file and returns the model."""
def read_problem(filename: str) -> Dict[str, SectionInfo]:
"""Reads a .alb file and returns the problem."""
current_info = SectionInfo()
model = {}
problem: Dict[str, SectionInfo] = {}
with open(filename, "r") as input_file:
print(f"Reading model from '{filename}'")
section_name = ""
print(f"Reading problem from '{filename}'")
for line in input_file:
stripped_line = line.strip()
if not stripped_line:
continue
match_section_def = re.match(r"<([\w\s]+)>", stripped_line)
match_section_def = re.fullmatch(r"<([\w\s]+)>", stripped_line)
if match_section_def:
section_name = match_section_def.group(1)
if section_name == "end":
continue
current_info = SectionInfo()
model[section_name] = current_info
problem[section_name] = current_info
continue
match_single_number = re.match(r"^([0-9]+)$", stripped_line)
match_single_number = re.fullmatch(r"^([0-9]+)$", stripped_line)
if match_single_number:
current_info.value = int(match_single_number.group(1))
continue
match_key_value = re.match(r"^([0-9]+)\s+([0-9]+)$", stripped_line)
match_key_value = re.fullmatch(r"^([0-9]+)\s+([0-9]+)$", stripped_line)
if match_key_value:
key = int(match_key_value.group(1))
value = int(match_key_value.group(2))
current_info.index_map[key] = value
continue
match_pair = re.match(r"^([0-9]+),([0-9]+)$", stripped_line)
match_pair = re.fullmatch(r"^([0-9]+),([0-9]+)$", stripped_line)
if match_pair:
left = int(match_pair.group(1))
right = int(match_pair.group(2))
@@ -112,24 +111,26 @@ def read_model(filename):
print(f"Unrecognized line '{stripped_line}'")
return model
return problem
def print_stats(model) -> None:
print("Model Statistics")
for key, value in model.items():
def print_stats(problem: Dict[str, SectionInfo]) -> None:
print("Problem Statistics")
for key, value in problem.items():
print(f" - {key}: {value}")
def solve_model_greedily(model):
def solve_problem_greedily(problem: Dict[str, SectionInfo]) -> Dict[int, int]:
"""Compute a greedy solution."""
print("Solving using a Greedy heuristics")
num_tasks = model["number of tasks"].value
num_tasks = problem["number of tasks"].value
if num_tasks is None:
return {}
all_tasks = range(1, num_tasks + 1) # Tasks are 1 based in the data.
precedences = model["precedence relations"].set_of_pairs
durations = model["task times"].index_map
cycle_time = model["cycle time"].value
precedences = problem["precedence relations"].set_of_pairs
durations = problem["task times"].index_map
cycle_time = problem["cycle time"].value
weights = collections.defaultdict(int)
successors = collections.defaultdict(list)
@@ -142,7 +143,7 @@ def solve_model_greedily(model):
if after in candidates:
candidates.remove(after)
assignment = {}
assignment: Dict[int, int] = {}
current_pod = 0
residual_capacity = cycle_time
@@ -183,16 +184,20 @@ def solve_model_greedily(model):
return assignment
def solve_boolean_model(model, hint) -> None:
"""solve the given model."""
def solve_problem_with_boolean_model(
problem: Dict[str, SectionInfo], hint: Dict[int, int]
) -> None:
"""solve the given problem."""
print("Solving using the Boolean model")
# Model data
num_tasks = model["number of tasks"].value
all_tasks = range(1, num_tasks + 1) # Tasks are 1 based in the model.
durations = model["task times"].index_map
precedences = model["precedence relations"].set_of_pairs
cycle_time = model["cycle time"].value
# problem data
num_tasks = problem["number of tasks"].value
if num_tasks is None:
return
all_tasks = range(1, num_tasks + 1) # Tasks are 1 based in the problem.
durations = problem["task times"].index_map
precedences = problem["precedence relations"].set_of_pairs
cycle_time = problem["cycle time"].value
num_pods = max(p for _, p in hint.items()) + 1 if hint else num_tasks - 1
all_pods = range(num_pods)
@@ -272,16 +277,20 @@ def solve_boolean_model(model, hint) -> None:
solver.solve(model)
def solve_scheduling_model(model, hint):
"""solve the given model using a cumutive model."""
def solve_problem_with_scheduling_model(
problem: Dict[str, SectionInfo], hint: Dict[int, int]
) -> None:
"""solve the given problem using a cumulative model."""
print("Solving using the scheduling model")
# Model data
num_tasks = model["number of tasks"].value
# Problem data
num_tasks = problem["number of tasks"].value
if num_tasks is None:
return
all_tasks = range(1, num_tasks + 1) # Tasks are 1 based in the data.
durations = model["task times"].index_map
precedences = model["precedence relations"].set_of_pairs
cycle_time = model["cycle time"].value
durations = problem["task times"].index_map
precedences = problem["precedence relations"].set_of_pairs
cycle_time = problem["cycle time"].value
num_pods = max(p for _, p in hint.items()) + 1 if hint else num_tasks
@@ -339,14 +348,14 @@ def main(argv: Sequence[str]) -> None:
if len(argv) > 1:
raise app.UsageError("Too many command-line arguments.")
model = read_model(_INPUT.value)
print_stats(model)
greedy_solution = solve_model_greedily(model)
problem = read_problem(_INPUT.value)
print_stats(problem)
greedy_solution = solve_problem_greedily(problem)
if _MODEL.value == "boolean":
solve_boolean_model(model, greedy_solution)
solve_problem_with_boolean_model(problem, greedy_solution)
elif _MODEL.value == "scheduling":
solve_scheduling_model(model, greedy_solution)
solve_problem_with_scheduling_model(problem, greedy_solution)
if __name__ == "__main__":

View File

@@ -20,10 +20,11 @@ visit all boxes in order, and walk on each block in a 4x4x4 map exactly once.
Admissible moves are one step in one of the 6 directions:
x+, x-, y+, y-, z+(up), z-(down)
"""
from typing import Sequence, Tuple
from typing import Dict, Sequence, Tuple
from absl import app
from absl import flags
from google.protobuf import text_format
from ortools.sat.python import cp_model
@@ -31,12 +32,24 @@ _OUTPUT_PROTO = flags.DEFINE_string(
"output_proto", "", "Output file to write the cp_model proto to."
)
_PARAMS = flags.DEFINE_string(
"params", "num_search_workers:8,log_search_progress:true", "Sat solver parameters."
"params",
"num_search_workers:8,log_search_progress:true",
"Sat solver parameters.",
)
def add_neighbor(
size, x, y, z, dx, dy, dz, model, index_map, position_to_rank, arcs
size: int,
x: int,
y: int,
z: int,
dx: int,
dy: int,
dz: int,
model: cp_model.CpModel,
index_map: Dict[Tuple[int, int, int], int],
position_to_rank: Dict[Tuple[int, int, int], cp_model.IntVar],
arcs: list[Tuple[int, int, cp_model.LiteralT]],
) -> None:
"""Checks if the neighbor is valid, and adds it to the model."""
if (
@@ -57,7 +70,7 @@ def add_neighbor(
arcs.append((before_index, after_index, move_literal))
def escape_the_maze(params, output_proto) -> None:
def escape_the_maze(params: str, output_proto: str) -> None:
"""Escapes the maze."""
size = 4
boxes = [(0, 1, 0), (2, 0, 1), (1, 3, 1), (3, 1, 3)]
@@ -145,8 +158,8 @@ def escape_the_maze(params, output_proto) -> None:
elif position == end:
msg += " [end]"
else:
for b in range(len(boxes)):
if position == boxes[b]:
for b, box in enumerate(boxes):
if position == box:
msg += f" [boxes {b}]"
path[rank] = msg
print(path)

View File

@@ -30,10 +30,9 @@ from google.protobuf import text_format
from ortools.sat.python import cp_model
_PARAMS = flags.DEFINE_string(
"params", "num_search_workers:16, max_time_in_seconds:30", "Sat solver parameters."
)
_PROTO_FILE = flags.DEFINE_string(
"proto_file", "", "If not empty, output the proto to this file."
"params",
"num_search_workers:16, max_time_in_seconds:30",
"Sat solver parameters.",
)
# Recipes
@@ -81,7 +80,9 @@ class Recipe:
self.name = name
self.tasks = []
def add_task(self, resource_name, min_duration, max_duration):
def add_task(
self, resource_name: str, min_duration: int, max_duration: int
) -> "Recipe":
self.tasks.append(Task(resource_name, min_duration, max_duration))
return self
@@ -102,7 +103,7 @@ class Resource:
self.capacity = capacity
self.skills = []
def add_skill(self, skill_name, efficiency):
def add_skill(self, skill_name: str, efficiency: float) -> "Resource":
self.skills.append(Skill(skill_name, efficiency))
return self
@@ -233,7 +234,6 @@ def solve_with_cp_sat(
skill_name = task.name
suffix = f"_{order.unique_id}_batch{batch}_{skill_name}"
start = None
if previous_end is None:
start = model.new_int_var(start_work, horizon, f"start{suffix}")
orders_sequence_of_events[order_id].append(
@@ -245,7 +245,6 @@ def solve_with_cp_sat(
size = model.new_int_var(
task.min_duration, task.max_duration, f"size{suffix}"
)
end = None
if task == recipe.tasks[-1]:
# The order must end after the due_date. Ideally, exactly at the
# due_date.

View File

@@ -28,26 +28,26 @@ class NQueenSolutionPrinter(cp_model.CpSolverSolutionCallback):
def __init__(self, queens: list[cp_model.IntVar]):
cp_model.CpSolverSolutionCallback.__init__(self)
self.__queens = queens
self.__solution_count = 0
self.__start_time = time.time()
self._queens = queens
self._solution_count = 0
self._start_time = time.time()
@property
def solution_count(self) -> int:
return self.__solution_count
return self._solution_count
def on_solution_callback(self) -> None:
current_time = time.time()
print(
"Solution %i, time = %f s"
% (self.__solution_count, current_time - self.__start_time)
f"Solution{self._solution_count}, time ="
f" {current_time - self._start_time} s"
)
self.__solution_count += 1
self._solution_count += 1
all_queens = range(len(self.__queens))
all_queens = range(len(self._queens))
for i in all_queens:
for j in all_queens:
if self.value(self.__queens[j]) == i:
if self.value(self._queens[j]) == i:
# There is a queen in column j, row i.
print("Q", end=" ")
else:

View File

@@ -110,7 +110,7 @@ def add_soft_sequence_constraint(
for length in range(hard_min, soft_min):
for start in range(len(works) - length + 1):
span = negated_bounded_span(works, start, length)
name = ": under_span(start=%i, length=%i)" % (start, length)
name = f": under_span(start={start}, length={length})"
lit = model.new_bool_var(prefix + name)
span.append(lit)
model.add_bool_or(span)
@@ -124,7 +124,7 @@ def add_soft_sequence_constraint(
for length in range(soft_max + 1, hard_max + 1):
for start in range(len(works) - length + 1):
span = negated_bounded_span(works, start, length)
name = ": over_span(start=%i, length=%i)" % (start, length)
name = f": over_span(start={start}, length={length})"
lit = model.new_bool_var(prefix + name)
span.append(lit)
model.add_bool_or(span)
@@ -299,7 +299,7 @@ def solve_shift_scheduling(params: str, output_proto: str):
for e in range(num_employees):
for s in range(num_shifts):
for d in range(num_days):
work[e, s, d] = model.new_bool_var("work%i_%i_%i" % (e, s, d))
work[e, s, d] = model.new_bool_var(f"work{e}_{s}_{d}")
# Linear terms of the objective in a minimization context.
obj_int_vars: list[cp_model.IntVar] = []
@@ -335,7 +335,7 @@ def solve_shift_scheduling(params: str, output_proto: str):
soft_max,
hard_max,
max_cost,
"shift_constraint(employee %i, shift %i)" % (e, shift),
f"shift_constraint(employee {e}, shift {shift})",
)
obj_bool_vars.extend(variables)
obj_bool_coeffs.extend(coeffs)
@@ -355,8 +355,7 @@ def solve_shift_scheduling(params: str, output_proto: str):
soft_max,
hard_max,
max_cost,
"weekly_sum_constraint(employee %i, shift %i, week %i)"
% (e, shift, w),
f"weekly_sum_constraint(employee {e}, shift {shift}, week {w})",
)
obj_int_vars.extend(variables)
obj_int_coeffs.extend(coeffs)
@@ -373,7 +372,7 @@ def solve_shift_scheduling(params: str, output_proto: str):
model.add_bool_or(transition)
else:
trans_var = model.new_bool_var(
"transition (employee=%i, day=%i)" % (e, d)
f"transition (employee={e}, day={d})"
)
transition.append(trans_var)
model.add_bool_or(transition)
@@ -391,7 +390,7 @@ def solve_shift_scheduling(params: str, output_proto: str):
model.add(worked == sum(works))
over_penalty = excess_cover_penalties[s - 1]
if over_penalty > 0:
name = "excess_demand(shift=%i, week=%i, day=%i)" % (s, w, d)
name = f"excess_demand(shift={s}, week={w}, day={d})"
excess = model.new_int_var(0, num_employees - min_demand, name)
model.add(excess == worked - min_demand)
obj_int_vars.append(excess)
@@ -404,7 +403,7 @@ def solve_shift_scheduling(params: str, output_proto: str):
)
if output_proto:
print("Writing proto to %s" % output_proto)
print(f"Writing proto to {output_proto}")
with open(output_proto, "w") as text_file:
text_file.write(str(model))
@@ -428,7 +427,7 @@ def solve_shift_scheduling(params: str, output_proto: str):
for s in range(num_shifts):
if solver.boolean_value(work[e, s, d]):
schedule += shifts[s] + " "
print("worker %i: %s" % (e, schedule))
print(f"worker {e}: {schedule}")
print()
print("Penalties:")
for i, var in enumerate(obj_bool_vars):
@@ -442,16 +441,12 @@ def solve_shift_scheduling(params: str, output_proto: str):
for i, var in enumerate(obj_int_vars):
if solver.value(var) > 0:
print(
" %s violated by %i, linear penalty=%i"
% (var.name, solver.value(var), obj_int_coeffs[i])
f" {var.name} violated by {solver.value(var)}, linear"
f" penalty={obj_int_coeffs[i]}"
)
print()
print("Statistics")
print(" - status : %s" % solver.status_name(status))
print(" - conflicts : %i" % solver.num_conflicts)
print(" - branches : %i" % solver.num_branches)
print(" - wall time : %f s" % solver.wall_time)
print(solver.response_stats())
def main(_):

View File

@@ -40,17 +40,16 @@ _PREPROCESS = flags.DEFINE_bool(
class SolutionPrinter(cp_model.CpSolverSolutionCallback):
"""Print intermediate solutions."""
def __init__(self):
def __init__(self) -> None:
cp_model.CpSolverSolutionCallback.__init__(self)
self.__solution_count = 0
def on_solution_callback(self):
"""Called after each new solution found."""
def on_solution_callback(self) -> None:
"""Called at each new solution."""
print(
"Solution %i, time = %f s, objective = %i"
% (self.__solution_count, self.wall_time, self.objective_value)
f"Solution {self.__solution_count}, time = {self.wall_time} s,"
f" objective = {self.objective_value}"
)
self.__solution_count += 1
def single_machine_scheduling():
@@ -393,9 +392,7 @@ def single_machine_scheduling():
if min_incoming_setup == 0:
continue
print(
"job %i has a min incoming setup of %i" % (job_id, min_incoming_setup)
)
print(f"job {job_id} has a min incoming setup of {min_incoming_setup}")
# We can transfer some setup times to the duration of the job.
job_durations[job_id] += min_incoming_setup
# Decrease corresponding incoming setup times.
@@ -414,7 +411,7 @@ def single_machine_scheduling():
horizon = sum(job_durations) + sum(
max(setup_times[i][j] for i in range(num_jobs + 1)) for j in range(num_jobs)
)
print("Greedy horizon =", horizon)
print(f"Greedy horizon = {horizon}")
# ----------------------------------------------------------------------------
# Global storage of variables.
@@ -429,10 +426,10 @@ def single_machine_scheduling():
release_date = release_dates[job_id]
due_date = due_dates[job_id] if due_dates[job_id] != -1 else horizon
print(
"job %2i: start = %5i, duration = %4i, end = %6i"
% (job_id, release_date, duration, due_date)
f"job {job_id:2}: start = {release_date:5}, duration = {duration:4},"
f" end = {due_date:6}"
)
name_suffix = "_%i" % job_id
name_suffix = f"_{job_id}"
start = model.new_int_var(release_date, due_date, "s" + name_suffix)
end = model.new_int_var(release_date, due_date, "e" + name_suffix)
interval = model.new_interval_var(start, duration, end, "i" + name_suffix)
@@ -460,7 +457,7 @@ def single_machine_scheduling():
if i == j:
continue
lit = model.new_bool_var("%i follows %i" % (j, i))
lit = model.new_bool_var(f"{j} follows {i}")
arcs.append((i + 1, j + 1, lit))
# We add the reified precedence to link the literal with the times of the
@@ -481,7 +478,7 @@ def single_machine_scheduling():
# ----------------------------------------------------------------------------
# Precedences.
for before, after in precedences:
print("job %i is after job %i" % (after, before))
print(f"job {after} is after job {before}")
model.add(ends[before] <= starts[after])
# ----------------------------------------------------------------------------
@@ -493,7 +490,7 @@ def single_machine_scheduling():
# ----------------------------------------------------------------------------
# Write problem to file.
if output_proto_file:
print("Writing proto to %s" % output_proto_file)
print(f"Writing proto to {output_proto_file}")
with open(output_proto_file, "w") as text_file:
text_file.write(str(model))
@@ -507,8 +504,8 @@ def single_machine_scheduling():
solver.solve(model, solution_printer)
for job_id in all_jobs:
print(
"job %i starts at %i end ends at %i"
% (job_id, solver.value(starts[job_id]), solver.value(ends[job_id]))
f"job {job_id} starts at {solver.value(starts[job_id])} end ends at"
f" {solver.value(ends[job_id])}"
)

View File

@@ -39,13 +39,10 @@ _PARAMS = flags.DEFINE_string(
)
def build_problem(problem_id):
def build_problem(
problem_id: int,
) -> tuple[int, list[int], int, list[tuple[int, int]]]:
"""Build problem data."""
capacities = None
num_colors = None
num_slabs = None
orders = None
if problem_id == 0:
capacities = [
# fmt:off
@@ -100,15 +97,22 @@ def build_problem(problem_id):
# fmt:on
]
elif problem_id == 3:
else: # problem_id == 3, default problem.
capacities = [0, 17, 44]
num_colors = 8
num_slabs = 10
orders = [ # (size, color)
# fmt:off
(4, 1), (22, 2), (9, 3), (5, 4), (8, 5), (3, 6), (3, 4), (4, 7),
(7, 4), (7, 8), (3, 6),
# fmt:on
(4, 1),
(22, 2),
(9, 3),
(5, 4),
(8, 5),
(3, 6),
(3, 4),
(4, 7),
(7, 4),
(7, 8),
(3, 6),
]
return (num_slabs, capacities, num_colors, orders)
@@ -117,7 +121,7 @@ def build_problem(problem_id):
class SteelMillSlabSolutionPrinter(cp_model.CpSolverSolutionCallback):
"""Print intermediate solutions."""
def __init__(self, orders, assign, load, loss):
def __init__(self, orders, assign, load, loss) -> None:
cp_model.CpSolverSolutionCallback.__init__(self)
self.__orders = orders
self.__assign = assign
@@ -128,13 +132,13 @@ class SteelMillSlabSolutionPrinter(cp_model.CpSolverSolutionCallback):
self.__all_slabs = range(len(assign[0]))
self.__start_time = time.time()
def on_solution_callback(self):
def on_solution_callback(self) -> None:
"""Called on each new solution."""
current_time = time.time()
objective = sum(self.value(l) for l in self.__loss)
print(
"Solution %i, time = %f s, objective = %i"
% (self.__solution_count, current_time - self.__start_time, objective)
f"Solution {self.__solution_count}, time ="
f" {current_time - self.__start_time} s, objective = {objective}"
)
self.__solution_count += 1
orders_in_slab = [
@@ -143,25 +147,20 @@ class SteelMillSlabSolutionPrinter(cp_model.CpSolverSolutionCallback):
]
for s in self.__all_slabs:
if orders_in_slab[s]:
line = " - slab %i, load = %i, loss = %i, orders = [" % (
s,
self.value(self.__load[s]),
self.value(self.__loss[s]),
line = (
f" - slab {s}, load = {self.value(self.__load[s])}, loss ="
f" {self.value(self.__loss[s])}, orders = ["
)
for o in orders_in_slab[s]:
line += "#%i(w%i, c%i) " % (
o,
self.__orders[o][0],
self.__orders[o][1],
)
line += f"#{o}(w{self.__orders[o][0]}, c{self.__orders[o][1]})"
line += "]"
print(line)
def steel_mill_slab(problem, break_symmetries) -> None:
def steel_mill_slab(problem_id: int, break_symmetries: bool) -> None:
"""Solves the Steel Mill Slab Problem."""
### Load problem.
(num_slabs, capacities, num_colors, orders) = build_problem(problem)
num_slabs, capacities, num_colors, orders = build_problem(problem_id)
num_orders = len(orders)
num_capacities = len(capacities)
@@ -169,8 +168,8 @@ def steel_mill_slab(problem, break_symmetries) -> None:
all_colors = range(num_colors)
all_orders = range(len(orders))
print(
"Solving steel mill with %i orders, %i slabs, and %i capacities"
% (num_orders, num_slabs, num_capacities - 1)
f"Solving steel mill with {num_orders} orders, {num_slabs} slabs, and"
f" {num_capacities - 1} capacities"
)
# Compute auxiliary data.
@@ -193,14 +192,12 @@ def steel_mill_slab(problem, break_symmetries) -> None:
# Create the model and the decision variables.
model = cp_model.CpModel()
assign = [
[model.new_bool_var("assign_%i_to_slab_%i" % (o, s)) for s in all_slabs]
[model.new_bool_var(f"assign_{o}_to_slab_{s}") for s in all_slabs]
for o in all_orders
]
loads = [
model.new_int_var(0, max_capacity, "load_of_slab_%i" % s) for s in all_slabs
]
loads = [model.new_int_var(0, max_capacity, f"load_of_slab_{s}") for s in all_slabs]
color_is_in_slab = [
[model.new_bool_var("color_%i_in_slab_%i" % (c + 1, s)) for c in all_colors]
[model.new_bool_var(f"color_{c + 1}_in_slab_{s}") for c in all_colors]
for s in all_slabs
]
@@ -267,19 +264,19 @@ def steel_mill_slab(problem, break_symmetries) -> None:
# Create position variables if there are symmetries to be broken.
if break_symmetries and ordered_equivalent_orders:
print(
" - creating %i symmetry breaking constraints"
% len(ordered_equivalent_orders)
f" - creating {len(ordered_equivalent_orders)} symmetry breaking"
" constraints"
)
positions = {}
for p in ordered_equivalent_orders:
if p[0] not in positions:
positions[p[0]] = model.new_int_var(
0, num_slabs - 1, "position_of_slab_%i" % p[0]
0, num_slabs - 1, f"position_of_slab_{p[0]}"
)
model.add_map_domain(positions[p[0]], assign[p[0]])
if p[1] not in positions:
positions[p[1]] = model.new_int_var(
0, num_slabs - 1, "position_of_slab_%i" % p[1]
0, num_slabs - 1, f"position_of_slab_{p[1]}"
)
model.add_map_domain(positions[p[1]], assign[p[1]])
# Finally add the symmetry breaking constraint.
@@ -287,7 +284,7 @@ def steel_mill_slab(problem, break_symmetries) -> None:
# Objective.
obj = model.new_int_var(0, num_slabs * max_loss, "obj")
losses = [model.new_int_var(0, max_loss, "loss_%i" % s) for s in all_slabs]
losses = [model.new_int_var(0, max_loss, f"loss_{s}") for s in all_slabs]
for s in all_slabs:
model.add_element(loads[s], loss_array, losses[s])
model.add(obj == sum(losses))
@@ -303,14 +300,19 @@ def steel_mill_slab(problem, break_symmetries) -> None:
### Output the solution.
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
print(
"Loss = %i, time = %f s, %i conflicts"
% (solver.objective_value, solver.wall_time, solver.num_conflicts)
f"Loss = {solver.objective_value}, time = {solver.wall_time} s,"
f" {solver.num_conflicts} conflicts"
)
else:
print("No solution")
def collect_valid_slabs_dp(capacities, colors, widths, loss_array):
def collect_valid_slabs_dp(
capacities: list[int],
colors: list[int],
widths: list[int],
loss_array: list[int],
) -> list[list[int]]:
"""Collect valid columns (assign, loss) for one slab."""
start_time = time.time()
@@ -339,8 +341,8 @@ def collect_valid_slabs_dp(capacities, colors, widths, loss_array):
all_valid_assignments.extend(new_assignments)
print(
"%i assignments created in %.2f s"
% (len(all_valid_assignments), time.time() - start_time)
f"{len(all_valid_assignments)} assignments created in"
f" {time.time() - start_time:2f} s"
)
tuples = []
for assignment in all_valid_assignments:
@@ -354,10 +356,10 @@ def collect_valid_slabs_dp(capacities, colors, widths, loss_array):
return tuples
def steel_mill_slab_with_valid_slabs(problem, break_symmetries):
def steel_mill_slab_with_valid_slabs(problem_id: int, break_symmetries: bool) -> None:
"""Solves the Steel Mill Slab Problem."""
### Load problem.
(num_slabs, capacities, num_colors, orders) = build_problem(problem)
(num_slabs, capacities, num_colors, orders) = build_problem(problem_id)
num_orders = len(orders)
num_capacities = len(capacities)
@@ -365,8 +367,8 @@ def steel_mill_slab_with_valid_slabs(problem, break_symmetries):
all_colors = range(num_colors)
all_orders = range(len(orders))
print(
"Solving steel mill with %i orders, %i slabs, and %i capacities"
% (num_orders, num_slabs, num_capacities - 1)
f"Solving steel mill with {num_orders} orders, {num_slabs} slabs, and"
f" {num_capacities - 1} capacities"
)
# Compute auxiliary data.
@@ -383,11 +385,11 @@ def steel_mill_slab_with_valid_slabs(problem, break_symmetries):
# Create the model and the decision variables.
model = cp_model.CpModel()
assign = [
[model.new_bool_var("assign_%i_to_slab_%i" % (o, s)) for s in all_slabs]
[model.new_bool_var(r"assign_{o}_to_slab_{s}") for s in all_slabs]
for o in all_orders
]
loads = [model.new_int_var(0, max_capacity, "load_%i" % s) for s in all_slabs]
losses = [model.new_int_var(0, max_loss, "loss_%i" % s) for s in all_slabs]
loads = [model.new_int_var(0, max_capacity, f"load_{s}") for s in all_slabs]
losses = [model.new_int_var(0, max_loss, f"loss_{s}") for s in all_slabs]
unsorted_valid_slabs = collect_valid_slabs_dp(
capacities, colors, widths, loss_array
@@ -449,19 +451,19 @@ def steel_mill_slab_with_valid_slabs(problem, break_symmetries):
# Create position variables if there are symmetries to be broken.
if ordered_equivalent_orders:
print(
" - creating %i symmetry breaking constraints"
% len(ordered_equivalent_orders)
f" - creating {len(ordered_equivalent_orders)} symmetry breaking"
" constraints"
)
positions = {}
for p in ordered_equivalent_orders:
if p[0] not in positions:
positions[p[0]] = model.new_int_var(
0, num_slabs - 1, "position_of_slab_%i" % p[0]
0, num_slabs - 1, f"position_of_slab_{p[0]}"
)
model.add_map_domain(positions[p[0]], assign[p[0]])
if p[1] not in positions:
positions[p[1]] = model.new_int_var(
0, num_slabs - 1, "position_of_slab_%i" % p[1]
0, num_slabs - 1, f"position_of_slab_{p[1]}"
)
model.add_map_domain(positions[p[1]], assign[p[1]])
# Finally add the symmetry breaking constraint.
@@ -483,24 +485,24 @@ def steel_mill_slab_with_valid_slabs(problem, break_symmetries):
### Output the solution.
if status == cp_model.OPTIMAL:
print(
"Loss = %i, time = %.2f s, %i conflicts"
% (solver.objective_value, solver.wall_time, solver.num_conflicts)
f"Loss = {solver.objective_value}, time = {solver.wall_time:2f} s,"
f" {solver.num_conflicts} conflicts"
)
else:
print("No solution")
def steel_mill_slab_with_column_generation(problem):
def steel_mill_slab_with_column_generation(problem_id: int) -> None:
"""Solves the Steel Mill Slab Problem."""
### Load problem.
(num_slabs, capacities, _, orders) = build_problem(problem)
(num_slabs, capacities, _, orders) = build_problem(problem_id)
num_orders = len(orders)
num_capacities = len(capacities)
all_orders = range(len(orders))
print(
"Solving steel mill with %i orders, %i slabs, and %i capacities"
% (num_orders, num_slabs, num_capacities - 1)
f"Solving steel mill with {num_orders} orders, {num_slabs} slabs, and"
f" {num_capacities - 1} capacities"
)
# Compute auxiliary data.
@@ -524,7 +526,7 @@ def steel_mill_slab_with_column_generation(problem):
# create model and decision variables.
model = cp_model.CpModel()
selected = [model.new_bool_var("selected_%i" % i) for i in all_valid_slabs]
selected = [model.new_bool_var(f"selected_{i}") for i in all_valid_slabs]
for order_id in all_orders:
model.add(
@@ -552,8 +554,8 @@ def steel_mill_slab_with_column_generation(problem):
### Output the solution.
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
print(
"Loss = %i, time = %.2f s, %i conflicts"
% (solver.objective_value, solver.wall_time, solver.num_conflicts)
f"Loss = {solver.objective_value}, time = {solver.wall_time:2f} s,"
f" {solver.num_conflicts} conflicts"
)
else:
print("No solution")

View File

@@ -246,9 +246,9 @@ def task_allocation_sat() -> None:
assign = {}
for task in all_tasks:
for slot in all_slots:
assign[(task, slot)] = model.new_bool_var("x[%i][%i]" % (task, slot))
assign[(task, slot)] = model.new_bool_var(f"x[{task}][{slot}]")
count = model.new_int_var(0, nslots, "count")
slot_used = [model.new_bool_var("slot_used[%i]" % s) for s in all_slots]
slot_used = [model.new_bool_var(f"slot_used[{s}]") for s in all_slots]
for task in all_tasks:
model.add(
@@ -285,12 +285,7 @@ def task_allocation_sat() -> None:
# Uses the portfolion of heuristics.
solver.parameters.log_search_progress = True
solver.parameters.num_search_workers = 16
status = solver.solve(model)
print("Statistics")
print(" - status =", solver.status_name(status))
print(" - optimal solution =", solver.objective_value)
print(" - wall time : %f s" % solver.wall_time)
solver.solve(model)
def main(argv: Sequence[str]) -> None:

View File

@@ -76,7 +76,9 @@ def build_data() -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
def solve(
tests_data: pd.DataFrame, operator_data: pd.DataFrame, supplies_data: pd.DataFrame
tests_data: pd.DataFrame,
operator_data: pd.DataFrame,
supplies_data: pd.DataFrame,
) -> None:
"""Solve the scheduling of tests problem."""