routing: Improve pybind11 binding

This commit is contained in:
Mizux Seiha
2024-06-26 00:57:15 +02:00
parent e284ae4621
commit c869dd1d82
5 changed files with 785 additions and 31 deletions

View File

@@ -540,7 +540,7 @@ add_custom_command(
COMMAND ${stubgen_EXECUTABLE} -p ortools.math_opt.core.python.solver --output .
COMMAND ${stubgen_EXECUTABLE} -p ortools.pdlp.python.pdlp --output .
COMMAND ${stubgen_EXECUTABLE} -p ortools.routing.pywraprouting --output .
COMMAND ${stubgen_EXECUTABLE} -p ortools.routing.python.routing --output .
COMMAND ${stubgen_EXECUTABLE} -p ortools.routing.python.model --output .
COMMAND ${stubgen_EXECUTABLE} -p ortools.sat.python.swig_helper --output .
COMMAND ${stubgen_EXECUTABLE} -p ortools.scheduling.python.rcpsp --output .
COMMAND ${stubgen_EXECUTABLE} -p ortools.util.python.sorted_interval_list --output .

View File

@@ -126,6 +126,9 @@ PYBIND11_MODULE(constraint_solver, m) {
.def(pybind11::init<const std::string&,
const ConstraintSolverParameters&>())
.def("__str__", &Solver::DebugString)
.def("default_solver_parameters", &Solver::DefaultSolverParameters)
.def("parameters", &Solver::parameters)
.def("local_search_profile", &Solver::LocalSearchProfile)
.def("new_int_var",
pybind11::overload_cast<int64_t, int64_t, const std::string&>(
&Solver::MakeIntVar),

View File

@@ -11,8 +11,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <algorithm>
#include <cstdint>
#include <functional>
#include <iterator>
#include <utility>
#include "ortools/constraint_solver/constraint_solver.h"
@@ -30,8 +32,13 @@
#include "pybind11/stl.h"
#include "pybind11_protobuf/native_proto_caster.h"
using ::operations_research::Assignment;
using ::operations_research::DefaultRoutingModelParameters;
using ::operations_research::DefaultRoutingSearchParameters;
using ::operations_research::RoutingIndexManager;
using ::operations_research::RoutingDimension;
using ::operations_research::RoutingModelParameters;
using ::operations_research::RoutingSearchParameters;
using ::operations_research::RoutingModel;
using ::pybind11::arg;
@@ -41,6 +48,9 @@ PYBIND11_MODULE(model, m) {
pybind11::module::import(
"ortools.constraint_solver.python.constraint_solver");
m.def("default_routing_model_parameters", &DefaultRoutingModelParameters,
DOC(operations_research, DefaultRoutingModelParameters));
m.def("default_routing_search_parameters", &DefaultRoutingSearchParameters,
DOC(operations_research, DefaultRoutingSearchParameters));
@@ -52,6 +62,23 @@ PYBIND11_MODULE(model, m) {
RoutingIndexManager::NodeIndex(depot));
}),
DOC(operations_research, RoutingIndexManager, RoutingIndexManager))
.def(pybind11::init([](int num_nodes, int num_vehicles,
const std::vector<int> starts,
const std::vector<int> ends) {
std::vector<RoutingIndexManager::NodeIndex> start_nodes;
start_nodes.reserve(starts.size());
std::transform(starts.cbegin(), starts.cend(), std::back_inserter(start_nodes),
[](int node){return RoutingIndexManager::NodeIndex(node);});
std::vector<RoutingIndexManager::NodeIndex> end_nodes;
end_nodes.reserve(ends.size());
std::transform(ends.cbegin(), ends.cend(), std::back_inserter(end_nodes),
[](int node){return RoutingIndexManager::NodeIndex(node);});
return new RoutingIndexManager(
num_nodes, num_vehicles, start_nodes, end_nodes);
}),
DOC(operations_research, RoutingIndexManager, RoutingIndexManager))
.def("num_nodes", &RoutingIndexManager::num_nodes,
DOC(operations_research, RoutingIndexManager, num_nodes))
.def("num_vehicles", &RoutingIndexManager::num_vehicles,
@@ -76,33 +103,146 @@ PYBIND11_MODULE(model, m) {
.def("get_end_index", &RoutingIndexManager::GetEndIndex,
DOC(operations_research, RoutingIndexManager, GetEndIndex));
pybind11::class_<RoutingModel>(m, "RoutingModel")
.def(pybind11::init([](const RoutingIndexManager& routing_index_manager) {
return new RoutingModel(routing_index_manager);
}))
.def("register_transit_callback",
pybind11::class_<RoutingDimension>(m, "RoutingDimension")
.def("model", &RoutingDimension::model,
pybind11::return_value_policy::reference_internal)
.def("get_transit_value", &RoutingDimension::GetTransitValue,
arg("from_index"), arg("to_index"), arg("vehicle"))
.def("cumul_var", &RoutingDimension::CumulVar,
pybind11::return_value_policy::reference_internal, arg("index"));
pybind11::class_<RoutingModel> rm(m, "RoutingModel");
rm.def(pybind11::init([](const RoutingIndexManager& index_manager) {
return new RoutingModel(index_manager);
}));
rm.def(pybind11::init([](
const RoutingIndexManager& index_manager,
const RoutingModelParameters& parameters) {
return new RoutingModel(index_manager, parameters);
}));
rm.def("register_transit_matrix",
[](RoutingModel* routing_model,
std::vector<std::vector<int64_t>> transit_matrix) {
return routing_model->RegisterTransitMatrix(
std::move(transit_matrix));
});
rm.def("register_unary_transit_vector",
[](RoutingModel* routing_model,
std::vector<int64_t> transit_vector) {
return routing_model->RegisterUnaryTransitVector(
std::move(transit_vector));
});
rm.def("register_unary_transit_callback",
[](RoutingModel* routing_model,
std::function<int64_t(int64_t)> transit_callback) {
return routing_model->RegisterUnaryTransitCallback(
std::move(transit_callback));
});
rm.def("register_transit_callback",
[](RoutingModel* routing_model,
std::function<int64_t(int64_t, int64_t)> transit_callback) {
return routing_model->RegisterTransitCallback(
std::move(transit_callback));
})
.def("set_arc_cost_evaluator_of_all_vehicles",
});
rm.def("set_arc_cost_evaluator_of_all_vehicles",
&RoutingModel::SetArcCostEvaluatorOfAllVehicles,
arg("transit_callback_index"))
.def("solve", &RoutingModel::Solve,
arg("transit_callback_index"));
rm.def("add_dimension", &RoutingModel::AddDimension,
arg("evaluator_index"),
arg("slack_max"),
arg("capacity"),
arg("fix_start_cumul_to_zero"),
arg("name"));
rm.def("add_dimension_with_vehicle_capacity", &RoutingModel::AddDimensionWithVehicleCapacity,
arg("evaluator_index"),
arg("slack_max"),
arg("vehicle_capacities"),
arg("fix_start_cumul_to_zero"),
arg("name"));
rm.def("add_dimension_with_vehicle_transits", &RoutingModel::AddDimensionWithVehicleTransits,
arg("evaluator_indices"),
arg("slack_max"),
arg("capacity"),
arg("fix_start_cumul_to_zero"),
arg("name"));
rm.def("add_dimension_with_vehicle_transit_and_capacity", &RoutingModel::AddDimensionWithVehicleTransitAndCapacity,
arg("evaluator_indices"),
arg("slack_max"),
arg("vehicle_capacities"),
arg("fix_start_cumul_to_zero"),
arg("name"));
rm.def("add_constant_dimension", &RoutingModel::AddConstantDimension,
arg("value"),
arg("capacity"),
arg("fix_start_cumul_to_zero"),
arg("name"));
rm.def("add_vector_dimension", &RoutingModel::AddVectorDimension,
arg("values"),
arg("capacity"),
arg("fix_start_cumul_to_zero"),
arg("name"));
rm.def("add_matrix_dimension", &RoutingModel::AddMatrixDimension,
arg("values"),
arg("capacity"),
arg("fix_start_cumul_to_zero"),
arg("name"));
rm.def("get_dimension_or_die", &RoutingModel::GetDimensionOrDie,
pybind11::return_value_policy::reference_internal,
arg("assignment") = nullptr)
.def("solve_with_parameters", &RoutingModel::SolveWithParameters,
arg("dimension_name"));
rm.def("close_model", &RoutingModel::CloseModel);
rm.def("close_model_with_parameters", &RoutingModel::CloseModelWithParameters,
arg("search_parameters"));
rm.def("solve", &RoutingModel::Solve,
pybind11::return_value_policy::reference_internal,
arg("search_parameters"), arg("solutions") = nullptr)
.def("status", &RoutingModel::status)
.def("start", &RoutingModel::Start, arg("vehicle"))
.def("end", &RoutingModel::End, arg("vehicle"))
.def("is_start", &RoutingModel::IsStart, arg("index"))
.def("is_end", &RoutingModel::IsEnd, arg("index"))
.def("next", &RoutingModel::Next, arg("assignment"), arg("index"))
.def("next_var", &RoutingModel::NextVar,
pybind11::return_value_policy::reference_internal, arg("index"))
.def("get_arc_cost_for_vehicle", &RoutingModel::GetArcCostForVehicle,
arg("assignment") = nullptr);
// TODO(mizux) Add support for solutions parameters too.
rm.def("solve_with_parameters",
[](RoutingModel* routing_model
,const RoutingSearchParameters& search_parameters
/*,std::vector<const Assignment*>* solutions = nullptr*/) -> const Assignment* {
return routing_model->SolveWithParameters(search_parameters, nullptr);
}
,pybind11::return_value_policy::reference_internal
,arg("search_parameters")
//, arg("solutions") = nullptr
);
rm.def("status", &RoutingModel::status);
rm.def("nodes", &RoutingModel::nodes);
rm.def("vehicles", &RoutingModel::vehicles);
rm.def("size", &RoutingModel::Size);
rm.def("start", &RoutingModel::Start, arg("vehicle"));
rm.def("end", &RoutingModel::End, arg("vehicle"));
rm.def("is_start", &RoutingModel::IsStart, arg("index"));
rm.def("is_end", &RoutingModel::IsEnd, arg("index"));
rm.def("next", &RoutingModel::Next, arg("assignment"), arg("index"));
rm.def("next_var", &RoutingModel::NextVar,
pybind11::return_value_policy::reference_internal, arg("index"));
rm.def("get_arc_cost_for_vehicle", &RoutingModel::GetArcCostForVehicle,
arg("from_index"), arg("to_index"), arg("vehicle"));
rm.def("solver", &RoutingModel::solver,
pybind11::return_value_policy::reference_internal);
pybind11::enum_<RoutingModel::PenaltyCostBehavior>(rm, "PenaltyCostBehavior")
.value("PENALIZE_ONCE", RoutingModel::PenaltyCostBehavior::PENALIZE_ONCE)
.value("PENALIZE_PER_INACTIVE", RoutingModel::PenaltyCostBehavior::PENALIZE_PER_INACTIVE)
.export_values();
rm.def("add_disjunction",
[](RoutingModel* routing_model,
const std::vector<int64_t>& indices,
int64_t penalty,
int64_t max_cardinality,
RoutingModel::PenaltyCostBehavior penalty_cost_behavior) -> int {
return static_cast<int>(routing_model->AddDisjunction(
indices,
penalty,
max_cardinality,
penalty_cost_behavior).value());
},
//&RoutingModel::AddDisjunction,
arg("indices"),
arg("penalty") = RoutingModel::kNoPenalty,
arg("max_cardinality") = 1,
arg("penalty_cost_behavior") = RoutingModel::PenaltyCostBehavior::PENALIZE_ONCE);
}

View File

@@ -14,13 +14,17 @@
"""Test for routing pybind11 layer."""
import functools
from absl.testing import absltest
from ortools.constraint_solver.python import constraint_solver
from ortools.routing import enums_pb2
from ortools.routing import parameters_pb2
from ortools.routing.python import model
FirstSolutionStrategy = enums_pb2.FirstSolutionStrategy
RoutingSearchStatus = enums_pb2.RoutingSearchStatus
RoutingSearchParameters = parameters_pb2.RoutingSearchParameters
def Distance(node_i, node_j):
return node_i + node_j
@@ -34,11 +38,22 @@ def UnaryTransitDistance(manager, i):
return Distance(manager.index_to_node(i), 0)
def One(unused_i, unused_j):
return 1
def Two(unused_i, unused_j):
return 1
def Three(unused_i, unused_j):
return 1
class TestRoutingIndexManager(absltest.TestCase):
def test_create_index_manager(self):
print("test_create_index_manager")
def testCtor(self):
manager = model.RoutingIndexManager(42, 3, 7)
self.assertIsNotNone(manager)
print(manager)
@@ -49,16 +64,612 @@ class TestRoutingIndexManager(absltest.TestCase):
self.assertEqual(7, manager.index_to_node(manager.get_start_index(i)))
self.assertEqual(7, manager.index_to_node(manager.get_end_index(i)))
def testCtorMultiDepotSame(self):
manager = model.RoutingIndexManager(42, 3, [0, 0, 0], [0, 0, 0])
self.assertIsNotNone(manager)
print(manager)
self.assertEqual(42, manager.num_nodes())
self.assertEqual(3, manager.num_vehicles())
self.assertEqual(42 + 3 * 2 - 1, manager.num_indices())
for i in range(manager.num_vehicles()):
self.assertEqual(0, manager.index_to_node(manager.get_start_index(i)))
self.assertEqual(0, manager.index_to_node(manager.get_end_index(i)))
def testCtorMultiDepotAllDiff(self):
manager = model.RoutingIndexManager(42, 3, [1, 2, 3], [4, 5, 6])
self.assertIsNotNone(manager)
print(manager)
self.assertEqual(42, manager.num_nodes())
self.assertEqual(3, manager.num_vehicles())
self.assertEqual(42, manager.num_indices())
for i in range(manager.num_vehicles()):
self.assertEqual(i + 1, manager.index_to_node(manager.get_start_index(i)))
self.assertEqual(i + 4, manager.index_to_node(manager.get_end_index(i)))
class ModelTest(absltest.TestCase):
def test_create_model(self):
print("test_create_model")
def testCtor(self):
manager = model.RoutingIndexManager(42, 3, 7)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
print(routing_model)
for i in range(manager.num_vehicles()):
self.assertEqual(7, manager.index_to_node(routing_model.start(i)))
self.assertEqual(7, manager.index_to_node(routing_model.end(i)))
def testSolve(self):
manager = model.RoutingIndexManager(42, 3, 7)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
assignment = routing_model.solve()
self.assertEqual(RoutingSearchStatus.ROUTING_OPTIMAL, routing_model.status())
self.assertIsNotNone(assignment)
self.assertEqual(0, assignment.objective_value())
def testSolveMultiDepot(self):
manager = model.RoutingIndexManager(42, 3, [1, 2, 3], [4, 5, 6])
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
assignment = routing_model.solve()
self.assertEqual(RoutingSearchStatus.ROUTING_OPTIMAL, routing_model.status())
self.assertIsNotNone(assignment)
self.assertEqual(0, assignment.objective_value())
def testTransitCallback(self):
manager = model.RoutingIndexManager(5, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
transit_idx = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
self.assertEqual(1, transit_idx)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
assignment = routing_model.solve()
self.assertTrue(assignment)
self.assertEqual(RoutingSearchStatus.ROUTING_SUCCESS, routing_model.status())
self.assertEqual(20, assignment.objective_value())
def testTransitLambda(self):
manager = model.RoutingIndexManager(5, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
transit_id = routing_model.register_transit_callback(lambda from_index, to_index: 1)
self.assertEqual(1, transit_id)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_id)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
assignment = routing_model.solve()
self.assertEqual(RoutingSearchStatus.ROUTING_SUCCESS, routing_model.status())
self.assertIsNotNone(assignment)
self.assertEqual(5, assignment.objective_value())
def testTransitMatrix(self):
manager = model.RoutingIndexManager(5, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
matrix = [[i + 1 for i in range(5)] for _ in range(5)]
transit_idx = routing_model.register_transit_matrix(matrix)
self.assertEqual(1, transit_idx)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
assignment = routing_model.solve()
self.assertTrue(assignment)
self.assertEqual(RoutingSearchStatus.ROUTING_SUCCESS, routing_model.status())
self.assertEqual(15, assignment.objective_value())
def testUnaryTransitCallback(self):
manager = model.RoutingIndexManager(5, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
transit_idx = routing_model.register_unary_transit_callback(
functools.partial(UnaryTransitDistance, manager)
)
self.assertEqual(1, transit_idx)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
assignment = routing_model.solve()
self.assertTrue(assignment)
self.assertEqual(RoutingSearchStatus.ROUTING_SUCCESS, routing_model.status())
self.assertEqual(10, assignment.objective_value())
def testUnaryTransitLambda(self):
manager = model.RoutingIndexManager(5, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
transit_id = routing_model.register_unary_transit_callback(lambda from_index: 1)
self.assertEqual(1, transit_id)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_id)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
assignment = routing_model.solve()
self.assertEqual(RoutingSearchStatus.ROUTING_SUCCESS, routing_model.status())
self.assertIsNotNone(assignment)
self.assertEqual(5, assignment.objective_value())
def testUnaryTransitVector(self):
manager = model.RoutingIndexManager(10, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
vector = list(range(10))
transit_idx = routing_model.register_unary_transit_vector(vector)
self.assertEqual(1, transit_idx)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
assignment = routing_model.solve()
self.assertTrue(assignment)
self.assertEqual(RoutingSearchStatus.ROUTING_SUCCESS, routing_model.status())
self.assertEqual(45, assignment.objective_value())
def testTSP(self):
# Create routing model
manager = model.RoutingIndexManager(10, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
transit_idx = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
# Solve
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertEqual(RoutingSearchStatus.ROUTING_SUCCESS, routing_model.status())
self.assertEqual(90, assignment.objective_value())
# Inspect solution
index = routing_model.start(0)
visited_nodes = []
expected_visited_nodes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]
while not routing_model.is_end(index):
index = assignment.value(routing_model.next_var(index))
visited_nodes.append(manager.index_to_node(index))
self.assertEqual(expected_visited_nodes, visited_nodes)
def testVRP(self):
# Create routing model
manager = model.RoutingIndexManager(10, 2, [0, 1], [1, 0])
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
transit_idx = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
# Solve
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertEqual(89, assignment.objective_value())
# Inspect solution
index = routing_model.start(1)
visited_nodes = []
expected_visited_nodes = [2, 4, 6, 8, 3, 5, 7, 9, 0]
while not routing_model.is_end(index):
index = assignment.value(routing_model.next_var(index))
visited_nodes.append(manager.index_to_node(index))
self.assertEqual(expected_visited_nodes, visited_nodes)
self.assertTrue(routing_model.is_end(assignment.value(routing_model.next_var(routing_model.start(0)))))
def testDimensionTSP(self):
# Create routing model
manager = model.RoutingIndexManager(10, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
transit_idx = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
# Add generic dimension
routing_model.add_dimension(transit_idx, 90, 90, True, "distance")
distance_dimension = routing_model.get_dimension_or_die("distance")
# Solve
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertEqual(90, assignment.objective_value())
# Inspect solution
node = routing_model.start(0)
cumul = 0
while not routing_model.is_end(node):
self.assertEqual(cumul, assignment.value(distance_dimension.cumul_var(node)))
next_node = assignment.value(routing_model.next_var(node))
cumul += Distance(node, next_node)
node = next_node
def testDimensionWithVehicleCapacitiesTSP(self):
# Create routing model
manager = model.RoutingIndexManager(10, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
transit_idx = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
# Add generic dimension
routing_model.add_dimension_with_vehicle_capacity(transit_idx, 90, [90], True, "distance")
distance_dimension = routing_model.get_dimension_or_die("distance")
# Solve
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertEqual(90, assignment.objective_value())
# Inspect solution
node = routing_model.start(0)
cumul = 0
while not routing_model.is_end(node):
self.assertEqual(cumul, assignment.value(distance_dimension.cumul_var(node)))
next_node = assignment.value(routing_model.next_var(node))
cumul += Distance(node, next_node)
node = next_node
def testDimensionWithVehicleTransitsTSP(self):
# Create routing model
manager = model.RoutingIndexManager(10, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
transit_idx = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
# Add generic dimension
routing_model.add_dimension_with_vehicle_transits([transit_idx], 90, 90, True, "distance")
distance_dimension = routing_model.get_dimension_or_die("distance")
# Solve
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertEqual(90, assignment.objective_value())
# Inspect solution
node = routing_model.start(0)
cumul = 0
while not routing_model.is_end(node):
self.assertEqual(cumul, assignment.value(distance_dimension.cumul_var(node)))
next_node = assignment.value(routing_model.next_var(node))
cumul += Distance(node, next_node)
node = next_node
def testDimensionWithVehicleTransitsVRP(self):
# Create routing model
manager = model.RoutingIndexManager(10, 3, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
transit_idx = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
# Add generic dimension
distances = [
routing_model.register_transit_callback(One),
routing_model.register_transit_callback(Two),
routing_model.register_transit_callback(Three),
]
routing_model.add_dimension_with_vehicle_transits(distances, 90, 90, True, "distance")
distance_dimension = routing_model.get_dimension_or_die("distance")
# Solve
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertEqual(90, assignment.objective_value())
# Inspect solution
for vehicle in range(0, routing_model.vehicles()):
node = routing_model.start(vehicle)
cumul = 0
while not routing_model.is_end(node):
self.assertEqual(
cumul, assignment.min(distance_dimension.cumul_var(node))
)
next_node = assignment.value(routing_model.next_var(node))
# Increment cumul by the vehicle distance which is equal to the vehicle
# index + 1, cf. distances.
cumul += vehicle + 1
node = next_node
def testConstantDimensionTSP(self):
# Create routing model
manager = model.RoutingIndexManager(10, 3, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
transit_idx = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
# Add constant dimension
constant_id, success = routing_model.add_constant_dimension(1, 100, True, "count")
self.assertTrue(success)
self.assertEqual(transit_idx + 1, constant_id)
count_dimension = routing_model.get_dimension_or_die("count")
# Solve
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertEqual(90, assignment.objective_value())
# Inspect solution
node = routing_model.start(0)
count = 0
while not routing_model.is_end(node):
self.assertEqual(count, assignment.value(count_dimension.cumul_var(node)))
count += 1
node = assignment.value(routing_model.next_var(node))
self.assertEqual(10, count)
def testVectorDimensionTSP(self):
# Create routing model
manager = model.RoutingIndexManager(10, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
transit_idx = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
# Add vector dimension
values = list(range(10))
unary_transit_id, success = routing_model.add_vector_dimension(
values, 100, True, "vector"
)
self.assertTrue(success)
self.assertEqual(transit_idx + 1, unary_transit_id)
vector_dimension = routing_model.get_dimension_or_die("vector")
# Solve
search_parameters: RoutingSearchParameters = (
model.default_routing_search_parameters()
)
self.assertIsNotNone(search_parameters)
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertIsNotNone(assignment)
self.assertEqual(RoutingSearchStatus.ROUTING_SUCCESS, routing_model.status())
self.assertEqual(90, assignment.objective_value())
# Inspect solution
node = routing_model.start(0)
cumul = 0
while not routing_model.is_end(node):
self.assertEqual(cumul, assignment.value(vector_dimension.cumul_var(node)))
cumul += values[node]
node = assignment.value(routing_model.next_var(node))
def testMatrixDimensionTSP(self):
# Create routing model
manager = model.RoutingIndexManager(5, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
cost = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
routing_model.set_arc_cost_evaluator_of_all_vehicles(cost)
# Add matrix dimension
values = [[j for _ in range(5)] for j in range(5)]
transit_id, success = routing_model.add_matrix_dimension(values, 100, True, "matrix")
self.assertTrue(success)
self.assertEqual(cost + 1, transit_id)
dimension = routing_model.get_dimension_or_die("matrix")
# Solve
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertIsNotNone(assignment)
self.assertEqual(RoutingSearchStatus.ROUTING_SUCCESS, routing_model.status())
self.assertEqual(20, assignment.objective_value())
# Inspect solution
index = routing_model.start(0)
cumul = 0
while not routing_model.is_end(index):
self.assertEqual(cumul, assignment.value(dimension.cumul_var(index)))
cumul += values[manager.index_to_node(index)][manager.index_to_node(index)]
index = assignment.value(routing_model.next_var(index))
def testMatrixDimensionVRP(self):
manager = model.RoutingIndexManager(5, 2, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
matrix = [[i + j for i in range(5)] for j in range(5)]
transit_idx = routing_model.register_transit_matrix(matrix)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
# Add matrix dimension
matrix_transit_idx, success = routing_model.add_matrix_dimension(
matrix, 10, True, "matrix" # capacity # fix_start_cumul_to_zero
)
self.assertTrue(success)
self.assertEqual(transit_idx + 1, matrix_transit_idx)
dimension = routing_model.get_dimension_or_die("matrix")
# Solve
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
self.assertEqual(
RoutingSearchStatus.ROUTING_NOT_SOLVED, routing_model.status()
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertIsNotNone(assignment)
self.assertEqual(RoutingSearchStatus.ROUTING_SUCCESS, routing_model.status())
self.assertEqual(20, assignment.objective_value())
# Inspect solution
for v in range(manager.num_vehicles()):
index = routing_model.start(v)
cumul = 0
while not routing_model.is_end(index):
self.assertEqual(cumul, assignment.value(dimension.cumul_var(index)))
prev_index = index
index = assignment.value(routing_model.next_var(index))
cumul += matrix[manager.index_to_node(prev_index)][
manager.index_to_node(index)
]
def testDisjunctionTSP(self):
# Create routing model
manager = model.RoutingIndexManager(10, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
transit_idx = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
# Add disjunctions
disjunctions = [
[manager.node_to_index(1), manager.node_to_index(2)],
[manager.node_to_index(3)],
[manager.node_to_index(4)],
[manager.node_to_index(5)],
[manager.node_to_index(6)],
[manager.node_to_index(7)],
[manager.node_to_index(8)],
[manager.node_to_index(9)],
]
for disjunction in disjunctions:
routing_model.add_disjunction(disjunction)
# Solve
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertEqual(86, assignment.objective_value())
# Inspect solution
node = routing_model.start(0)
count = 0
while not routing_model.is_end(node):
count += 1
node = assignment.value(routing_model.next_var(node))
self.assertEqual(9, count)
def testDisjunctionPenaltyTSP(self):
# Create routing model
manager = model.RoutingIndexManager(10, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager)
self.assertIsNotNone(routing_model)
# Add cost function
transit_idx = routing_model.register_transit_callback(
functools.partial(TransitDistance, manager)
)
routing_model.set_arc_cost_evaluator_of_all_vehicles(transit_idx)
# Add disjunctions
disjunctions = [
([manager.node_to_index(1), manager.node_to_index(2)], 1000),
([manager.node_to_index(3)], 1000),
([manager.node_to_index(4)], 1000),
([manager.node_to_index(5)], 1000),
([manager.node_to_index(6)], 1000),
([manager.node_to_index(7)], 1000),
([manager.node_to_index(8)], 1000),
([manager.node_to_index(9)], 0),
]
for disjunction, penalty in disjunctions:
routing_model.add_disjunction(disjunction, penalty)
# Solve
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = (
FirstSolutionStrategy.FIRST_UNBOUND_MIN_VALUE
)
assignment = routing_model.solve_with_parameters(search_parameters)
self.assertEqual(68, assignment.objective_value())
# Inspect solution
node = routing_model.start(0)
count = 0
while not routing_model.is_end(node):
count += 1
node = assignment.value(routing_model.next_var(node))
self.assertEqual(8, count)
def testRoutingModelParameters(self):
# Create routing model with parameters
parameters = model.default_routing_model_parameters()
parameters.solver_parameters.CopyFrom(
constraint_solver.Solver.default_solver_parameters())
parameters.solver_parameters.trace_propagation = True
manager = model.RoutingIndexManager(10, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager, parameters)
self.assertIsNotNone(routing_model)
self.assertEqual(1, routing_model.vehicles())
self.assertTrue(routing_model.solver().parameters().trace_propagation)
def testRoutingLocalSearchFiltering(self):
parameters = model.default_routing_model_parameters()
parameters.solver_parameters.profile_local_search = True
manager = model.RoutingIndexManager(10, 1, 0)
self.assertIsNotNone(manager)
routing_model = model.RoutingModel(manager, parameters)
self.assertIsNotNone(routing_model)
routing_model.solve()
profile = routing_model.solver().local_search_profile()
print(profile)
self.assertIsInstance(profile, str)
self.assertTrue(profile) # Verify it's not empty.
if __name__ == "__main__":

View File

@@ -26,6 +26,8 @@ from ortools.routing.python import model
FirstSolutionStrategy = enums_pb2.FirstSolutionStrategy
RoutingSearchStatus = enums_pb2.RoutingSearchStatus
RoutingSearchParameters = parameters_pb2.RoutingSearchParameters
# [END import]
@@ -140,16 +142,14 @@ def main():
# Setting first solution heuristic.
# [START parameters]
search_parameters: parameters_pb2.RoutingSearchParameters = (
model.default_routing_search_parameters()
)
search_parameters = model.default_routing_search_parameters()
search_parameters.first_solution_strategy = FirstSolutionStrategy.PATH_CHEAPEST_ARC
# [END parameters]
# Solve the problem.
# [START solve]
solution = routing.solve()
# solution = routing.solve_with_parameters(search_parameters)
#solution = routing.solve()
solution = routing.solve_with_parameters(search_parameters)
# [END solve]
# Print solution on console.