Files
ortools-clone/examples/python/flexible_job_shop_sat.py
2023-07-01 06:06:53 +02:00

209 lines
7.5 KiB
Python

#!/usr/bin/env python3
# Copyright 2010-2022 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Solves a flexible jobshop problems with the CP-SAT solver.
A jobshop is a standard scheduling problem when you must sequence a
series of task_types on a set of machines. Each job contains one task_type per
machine. The order of execution and the length of each job on each
machine is task_type dependent.
The objective is to minimize the maximum completion time of all
jobs. This is called the makespan.
"""
# overloaded sum() clashes with pytype.
import collections
from ortools.sat.python import cp_model
class SolutionPrinter(cp_model.CpSolverSolutionCallback):
"""Print intermediate solutions."""
def __init__(self):
cp_model.CpSolverSolutionCallback.__init__(self)
self.__solution_count = 0
def on_solution_callback(self):
"""Called at each new solution."""
print(
"Solution %i, time = %f s, objective = %i"
% (self.__solution_count, self.WallTime(), self.ObjectiveValue())
)
self.__solution_count += 1
def flexible_jobshop():
"""Solve a small flexible jobshop problem."""
# Data part.
jobs = [ # task = (processing_time, machine_id)
[ # Job 0
[(3, 0), (1, 1), (5, 2)], # task 0 with 3 alternatives
[(2, 0), (4, 1), (6, 2)], # task 1 with 3 alternatives
[(2, 0), (3, 1), (1, 2)], # task 2 with 3 alternatives
],
[ # Job 1
[(2, 0), (3, 1), (4, 2)],
[(1, 0), (5, 1), (4, 2)],
[(2, 0), (1, 1), (4, 2)],
],
[ # Job 2
[(2, 0), (1, 1), (4, 2)],
[(2, 0), (3, 1), (4, 2)],
[(3, 0), (1, 1), (5, 2)],
],
]
num_jobs = len(jobs)
all_jobs = range(num_jobs)
num_machines = 3
all_machines = range(num_machines)
# Model the flexible jobshop problem.
model = cp_model.CpModel()
horizon = 0
for job in jobs:
for task in job:
max_task_duration = 0
for alternative in task:
max_task_duration = max(max_task_duration, alternative[0])
horizon += max_task_duration
print("Horizon = %i" % horizon)
# Global storage of variables.
intervals_per_resources = collections.defaultdict(list)
starts = {} # indexed by (job_id, task_id).
presences = {} # indexed by (job_id, task_id, alt_id).
job_ends = []
# Scan the jobs and create the relevant variables and intervals.
for job_id in all_jobs:
job = jobs[job_id]
num_tasks = len(job)
previous_end = None
for task_id in range(num_tasks):
task = job[task_id]
min_duration = task[0][0]
max_duration = task[0][0]
num_alternatives = len(task)
all_alternatives = range(num_alternatives)
for alt_id in range(1, num_alternatives):
alt_duration = task[alt_id][0]
min_duration = min(min_duration, alt_duration)
max_duration = max(max_duration, alt_duration)
# Create main interval for the task.
suffix_name = "_j%i_t%i" % (job_id, task_id)
start = model.NewIntVar(0, horizon, "start" + suffix_name)
duration = model.NewIntVar(
min_duration, max_duration, "duration" + suffix_name
)
end = model.NewIntVar(0, horizon, "end" + suffix_name)
interval = model.NewIntervalVar(
start, duration, end, "interval" + suffix_name
)
# Store the start for the solution.
starts[(job_id, task_id)] = start
# Add precedence with previous task in the same job.
if previous_end is not None:
model.Add(start >= previous_end)
previous_end = end
# Create alternative intervals.
if num_alternatives > 1:
l_presences = []
for alt_id in all_alternatives:
alt_suffix = "_j%i_t%i_a%i" % (job_id, task_id, alt_id)
l_presence = model.NewBoolVar("presence" + alt_suffix)
l_start = model.NewIntVar(0, horizon, "start" + alt_suffix)
l_duration = task[alt_id][0]
l_end = model.NewIntVar(0, horizon, "end" + alt_suffix)
l_interval = model.NewOptionalIntervalVar(
l_start, l_duration, l_end, l_presence, "interval" + alt_suffix
)
l_presences.append(l_presence)
# Link the primary/global variables with the local ones.
model.Add(start == l_start).OnlyEnforceIf(l_presence)
model.Add(duration == l_duration).OnlyEnforceIf(l_presence)
model.Add(end == l_end).OnlyEnforceIf(l_presence)
# Add the local interval to the right machine.
intervals_per_resources[task[alt_id][1]].append(l_interval)
# Store the presences for the solution.
presences[(job_id, task_id, alt_id)] = l_presence
# Select exactly one presence variable.
model.AddExactlyOne(l_presences)
else:
intervals_per_resources[task[0][1]].append(interval)
presences[(job_id, task_id, 0)] = model.NewConstant(1)
job_ends.append(previous_end)
# Create machines constraints.
for machine_id in all_machines:
intervals = intervals_per_resources[machine_id]
if len(intervals) > 1:
model.AddNoOverlap(intervals)
# Makespan objective
makespan = model.NewIntVar(0, horizon, "makespan")
model.AddMaxEquality(makespan, job_ends)
model.Minimize(makespan)
# Solve model.
solver = cp_model.CpSolver()
solution_printer = SolutionPrinter()
status = solver.Solve(model, solution_printer)
# Print final solution.
for job_id in all_jobs:
print("Job %i:" % job_id)
for task_id in range(len(jobs[job_id])):
start_value = solver.Value(starts[(job_id, task_id)])
machine = -1
duration = -1
selected = -1
for alt_id in range(len(jobs[job_id][task_id])):
if solver.Value(presences[(job_id, task_id, alt_id)]):
duration = jobs[job_id][task_id][alt_id][0]
machine = jobs[job_id][task_id][alt_id][1]
selected = alt_id
print(
" task_%i_%i starts at %i (alt %i, machine %i, duration %i)"
% (job_id, task_id, start_value, selected, machine, duration)
)
print("Solve status: %s" % solver.StatusName(status))
print("Optimal objective value: %i" % solver.ObjectiveValue())
print("Statistics")
print(" - conflicts : %i" % solver.NumConflicts())
print(" - branches : %i" % solver.NumBranches())
print(" - wall time : %f s" % solver.WallTime())
flexible_jobshop()