[ModelBuilder] Polish numpy code
This commit is contained in:
@@ -33,7 +33,7 @@ rather than for solving specific optimization problems.
|
||||
|
||||
import math
|
||||
import numbers
|
||||
from typing import Any, Callable, Dict, List, Optional, Union, Sequence, Tuple
|
||||
from typing import Any, Callable, Dict, List, Literal, Optional, Union, Sequence, Tuple
|
||||
import numpy as np
|
||||
from numpy import typing as npt
|
||||
from numpy.lib import mixins
|
||||
@@ -49,9 +49,10 @@ ConstraintT = Union['VarCompVar', 'BoundedLinearExpression', bool]
|
||||
ShapeT = Union[IntegerT, Sequence[IntegerT]]
|
||||
NumpyFuncT = Callable[[
|
||||
'VariableContainer',
|
||||
Optional[np.double],
|
||||
Optional[Union[npt.NDArray[np.double], Sequence[NumberT]]],
|
||||
Optional[Union[NumberT, npt.NDArray[np.number], Sequence[NumberT]]],
|
||||
], LinearExprT,]
|
||||
SliceT = Union[slice, int, List[int], 'ellipsis',
|
||||
Tuple[Union[int, slice, List[int], 'ellipsis'], ...],]
|
||||
|
||||
# Forward solve statuses.
|
||||
SolveStatus = pwmb.SolveStatus
|
||||
@@ -562,7 +563,7 @@ class Variable(LinearExpr):
|
||||
return LinearExpr.weighted_sum([self], [arg], constant=0.0)
|
||||
|
||||
|
||||
_REGISTERED_NUMPY_UFUNCS: Dict[Any, NumpyFuncT] = {}
|
||||
_REGISTERED_NUMPY_VARIABLE_FUNCS: Dict[Any, NumpyFuncT] = {}
|
||||
|
||||
|
||||
class VariableContainer(mixins.NDArrayOperatorsMixin):
|
||||
@@ -579,13 +580,12 @@ class VariableContainer(mixins.NDArrayOperatorsMixin):
|
||||
|
||||
def __getitem__(
|
||||
self,
|
||||
pos: Union[slice, int, List[int], Tuple[Union[int, slice, List[int]],
|
||||
...]],
|
||||
pos: SliceT,
|
||||
) -> Union['VariableContainer', Variable]:
|
||||
# delegate the treatment of the 'pos' query to __variable_indices.
|
||||
index_or_slice: Union[np.int32, npt.NDArray[np.int32]] = (
|
||||
self.__variable_indices[pos])
|
||||
if mbh.is_integral(index_or_slice):
|
||||
if np.isscalar(index_or_slice):
|
||||
return Variable(self.__helper, index_or_slice, None, None, None)
|
||||
else:
|
||||
return VariableContainer(self.__helper, index_or_slice)
|
||||
@@ -616,12 +616,6 @@ class VariableContainer(mixins.NDArrayOperatorsMixin):
|
||||
"""Returns the number of variables in the numpy array."""
|
||||
return self.__variable_indices.size
|
||||
|
||||
@property
|
||||
def ravel(self) -> 'VariableContainer':
|
||||
"""returns the flattened array of variables."""
|
||||
return VariableContainer(self.__helper, self.__variable_indices.ravel())
|
||||
|
||||
@property
|
||||
def flatten(self) -> 'VariableContainer':
|
||||
"""returns the flattened array of variables."""
|
||||
return VariableContainer(self.__helper,
|
||||
@@ -638,118 +632,84 @@ class VariableContainer(mixins.NDArrayOperatorsMixin):
|
||||
def __len__(self):
|
||||
return self.__variable_indices.shape[0]
|
||||
|
||||
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
|
||||
def __array_ufunc__(
|
||||
self,
|
||||
ufunc: np.ufunc,
|
||||
method: Literal['__call__', 'reduce', 'reduceat', 'accumulate', 'outer',
|
||||
'inner'],
|
||||
*inputs: Any,
|
||||
**kwargs: Any,
|
||||
) -> LinearExprT:
|
||||
if method != '__call__':
|
||||
return NotImplemented
|
||||
ufunc_impl = _REGISTERED_NUMPY_UFUNCS.get(ufunc)
|
||||
if ufunc_impl is None:
|
||||
function = _REGISTERED_NUMPY_VARIABLE_FUNCS.get(ufunc)
|
||||
if function is None:
|
||||
return NotImplemented
|
||||
container: Optional[VariableContainer] = None
|
||||
scalar: Optional[NumberT] = None
|
||||
coeffs: Optional[npt.NDArray[np.double]] = None
|
||||
if len(inputs) <= 2 and isinstance(inputs[0], VariableContainer):
|
||||
return function(*inputs, **kwargs)
|
||||
if len(inputs) == 2 and isinstance(inputs[1], VariableContainer):
|
||||
return function(inputs[1], inputs[0], **kwargs)
|
||||
return NotImplemented
|
||||
|
||||
for arg in inputs:
|
||||
if mbh.is_a_number(arg):
|
||||
scalar = mbh.assert_is_a_number(arg)
|
||||
elif isinstance(arg, self.__class__):
|
||||
container = arg
|
||||
elif isinstance(arg, (Sequence, np.ndarray)):
|
||||
coeffs = np.array(arg, dtype=np.double)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
if container is None:
|
||||
return NotImplemented
|
||||
if coeffs is not None:
|
||||
assert container.shape == coeffs.shape, (container.shape,
|
||||
coeffs.shape)
|
||||
return ufunc_impl(container, scalar, coeffs)
|
||||
|
||||
def __array_function__(self, func: Any, types: Any, args: Any,
|
||||
def __array_function__(self, func: Any, types: Any, inputs: Any,
|
||||
kwargs: Any) -> LinearExprT:
|
||||
if func not in _REGISTERED_NUMPY_UFUNCS:
|
||||
function = _REGISTERED_NUMPY_VARIABLE_FUNCS.get(func)
|
||||
if function is None:
|
||||
return NotImplemented
|
||||
return _REGISTERED_NUMPY_UFUNCS[func](*args, **kwargs)
|
||||
if len(inputs) <= 2 and isinstance(inputs[0], VariableContainer):
|
||||
return function(*inputs, **kwargs)
|
||||
if len(inputs) == 2 and isinstance(inputs[1], VariableContainer):
|
||||
return function(inputs[1], inputs[0], **kwargs)
|
||||
return NotImplemented
|
||||
|
||||
|
||||
def _implements(np_function: Any) -> Callable[[NumpyFuncT], NumpyFuncT]:
|
||||
"""Register an __array_function__ implementation for VariableContainer objects."""
|
||||
|
||||
def decorator(func: NumpyFuncT) -> NumpyFuncT:
|
||||
_REGISTERED_NUMPY_UFUNCS[np_function] = func
|
||||
_REGISTERED_NUMPY_VARIABLE_FUNCS[np_function] = func
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
@_implements(np.sum)
|
||||
def sum_variable_container(
|
||||
container: VariableContainer,
|
||||
scalar: Optional[np.double] = None,
|
||||
coeffs: Optional[npt.NDArray[np.double]] = None,
|
||||
) -> LinearExprT:
|
||||
def sum_variable_container(container: VariableContainer,
|
||||
constant: NumberT = 0.0) -> LinearExprT:
|
||||
"""Implementation of np.sum for VariableContainer objects."""
|
||||
assert coeffs is None
|
||||
indices: npt.NDArray[np.int32] = container.variable_indices
|
||||
constant = scalar if scalar is not None else np.double(0.0)
|
||||
return _WeightedSum(
|
||||
variable_indices=indices.flatten(),
|
||||
coefficients=np.ones(indices.size),
|
||||
constant=constant,
|
||||
constant=np.double(constant),
|
||||
)
|
||||
|
||||
|
||||
@_implements(np.multiply)
|
||||
def multiply_variable_container(
|
||||
container: VariableContainer,
|
||||
scalar: Optional[np.double] = None,
|
||||
coeffs: Optional[npt.NDArray[np.double]] = None,
|
||||
) -> LinearExprT:
|
||||
"""Implementation of np.multiply for VariableContainer objects."""
|
||||
indices: npt.NDArray[np.int32] = container.variable_indices
|
||||
if scalar is not None:
|
||||
assert coeffs is None
|
||||
return _WeightedSum(
|
||||
variable_indices=indices.flatten(),
|
||||
coefficients=np.full(indices.size, scalar),
|
||||
constant=0.0,
|
||||
)
|
||||
if coeffs is not None:
|
||||
assert container.shape == coeffs.shape, (container.shape, coeffs.shape)
|
||||
return _WeightedSum(
|
||||
variable_indices=indices.flatten(),
|
||||
coefficients=coeffs.flatten(),
|
||||
constant=0.0,
|
||||
)
|
||||
|
||||
raise ValueError('Cannot call multiply_variable_container without argument')
|
||||
|
||||
|
||||
@_implements(np.dot)
|
||||
def dot_variable_container(
|
||||
container: VariableContainer,
|
||||
scalar: Optional[np.double] = None,
|
||||
coeffs: Optional[npt.NDArray[np.double]] = None,
|
||||
arg: Union[np.double, npt.NDArray[np.double]],
|
||||
) -> LinearExprT:
|
||||
"""Implementation of np.dot for VariableContainer objects."""
|
||||
if len(container.shape) != 1:
|
||||
raise TypeError(
|
||||
'dot_variable_container only supports 1D variable containers')
|
||||
indices: npt.NDArray[np.int32] = container.variable_indices
|
||||
if coeffs is not None:
|
||||
assert scalar is None
|
||||
assert container.shape == coeffs.shape, (container.shape, coeffs.shape)
|
||||
if np.isscalar(arg):
|
||||
return _WeightedSum(
|
||||
variable_indices=indices.flatten(),
|
||||
coefficients=coeffs.flatten(),
|
||||
coefficients=np.full(indices.size, arg),
|
||||
constant=0.0,
|
||||
)
|
||||
if scalar is not None:
|
||||
else:
|
||||
arg: npt.NDArray[np.double] = np.array(arg, dtype=np.double)
|
||||
assert container.shape == arg.shape, (container.shape, arg.shape)
|
||||
return _WeightedSum(
|
||||
variable_indices=indices.flatten(),
|
||||
coefficients=np.full(indices.size, scalar),
|
||||
coefficients=arg.flatten(),
|
||||
constant=0.0,
|
||||
)
|
||||
|
||||
raise ValueError('Cannot call dot_variable_container without argument')
|
||||
|
||||
|
||||
class VarCompVar:
|
||||
"""Represents var == /!= var."""
|
||||
|
||||
Reference in New Issue
Block a user