23#include "Eigen/SparseCore"
24#include "absl/synchronization/blocking_counter.h"
25#include "absl/time/time.h"
33using ::Eigen::VectorXd;
37 const std::function<int64_t(int64_t)>& element_mass)
38 : thread_pool_(thread_pool) {
40 if (num_elements == 0) {
41 shard_starts_.push_back(0);
45 int64_t overall_mass = 0;
46 for (int64_t elem = 0; elem < num_elements; ++elem) {
47 overall_mass += element_mass(elem);
49 shard_starts_.push_back(0);
50 int64_t this_shard_mass = element_mass(0);
51 for (int64_t elem = 1; elem < num_elements; ++elem) {
52 int64_t this_elem_mass = element_mass(elem);
53 if (this_shard_mass + (this_elem_mass / 2) >= overall_mass / num_shards) {
55 shard_masses_.push_back(this_shard_mass);
56 shard_starts_.push_back(elem);
57 this_shard_mass = this_elem_mass;
59 this_shard_mass += this_elem_mass;
62 shard_starts_.push_back(num_elements);
63 shard_masses_.push_back(this_shard_mass);
69 : thread_pool_(thread_pool) {
71 if (num_elements == 0) {
72 shard_starts_.push_back(0);
76 shard_starts_.reserve(num_shards + 1);
77 shard_masses_.reserve(num_shards);
78 for (
int shard = 0; shard < num_shards; ++shard) {
79 const int64_t this_shard_start = ((num_elements * shard) / num_shards);
80 const int64_t next_shard_start =
81 ((num_elements * (shard + 1)) / num_shards);
82 if (next_shard_start - this_shard_start > 0) {
83 shard_starts_.push_back(this_shard_start);
84 shard_masses_.push_back(next_shard_start - this_shard_start);
87 shard_starts_.push_back(num_elements);
94 :
Sharder(num_elements,
std::
max(1, other_sharder.NumShards()),
95 other_sharder.thread_pool_) {}
98 const std::function<
void(
const Shard&)>& func)
const {
100 absl::BlockingCounter counter(
NumShards());
101 VLOG(2) <<
"Starting ParallelForEachShard()";
102 for (
int shard_num = 0; shard_num <
NumShards(); ++shard_num) {
103 thread_pool_->
Schedule([&, shard_num]() {
108 func(
Shard(shard_num,
this));
111 VLOG(2) <<
"Shard " << shard_num <<
" with " <<
ShardSize(shard_num)
112 <<
" elements and " <<
ShardMass(shard_num)
113 <<
" mass finished with "
115 std::max(int64_t{1}, absl::ToInt64Microseconds(
119 counter.DecrementCount();
123 VLOG(2) <<
"Done ParallelForEachShard()";
125 for (
int shard_num = 0; shard_num <
NumShards(); ++shard_num) {
126 func(
Shard(shard_num,
this));
132 const std::function<
double(
const Shard&)>& func)
const {
135 local_sums[shard.
Index()] = func(shard);
137 return local_sums.sum();
141 const std::function<
bool(
const Shard&)>& func)
const {
143 std::vector<int> local_result(
NumShards());
145 local_result[shard.
Index()] =
static_cast<int>(func(shard));
147 return std::all_of(local_result.begin(), local_result.end(),
148 [](
const int v) { return static_cast<bool>(v); });
152 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
153 const VectorXd& vector,
const Sharder& sharder) {
154 CHECK_EQ(vector.size(), matrix.rows());
155 VectorXd answer(matrix.cols());
157 shard(answer) = shard(matrix).transpose() * vector;
163 const Sharder& sharder, VectorXd& dest) {
165 shard(dest) += scale * shard(increment);
170 dest.resize(vec.size());
183 const Sharder& sharder, VectorXd& dest) {
185 shard(dest) = shard(dest).cwiseProduct(shard(scale));
191 const Sharder& sharder, VectorXd& dest) {
193 shard(dest) = shard(dest).cwiseQuotient(shard(scale));
197double Dot(
const VectorXd& v1,
const VectorXd& v2,
const Sharder& sharder) {
199 [&](
const Sharder::Shard& shard) {
return shard(v1).dot(shard(v2)); });
205 local_max[shard.
Index()] = shard(vector).lpNorm<Eigen::Infinity>();
207 return local_max.lpNorm<Eigen::Infinity>();
212 [&](
const Sharder::Shard& shard) {
return shard(vector).lpNorm<1>(); });
217 [&](
const Sharder::Shard& shard) {
return shard(vector).squaredNorm(); });
227 return (shard(vector1) - shard(vector2)).squaredNorm();
231double Distance(
const VectorXd& vector1,
const VectorXd& vector2,
240 local_max[shard.
Index()] =
241 shard(vector).cwiseProduct(shard(scale)).lpNorm<Eigen::Infinity>();
243 return local_max.lpNorm<Eigen::Infinity>();
249 return shard(vector).cwiseProduct(shard(scale)).squaredNorm();
253double ScaledNorm(
const VectorXd& vector,
const VectorXd& scale,
259 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
260 const VectorXd& row_scaling_vec,
const VectorXd& col_scaling_vec,
262 CHECK_EQ(matrix.cols(), col_scaling_vec.size());
263 CHECK_EQ(matrix.rows(), row_scaling_vec.size());
264 VectorXd answer(matrix.cols());
266 auto matrix_shard = shard(matrix);
267 auto col_scaling_shard = shard(col_scaling_vec);
268 for (int64_t col_num = 0; col_num < shard(matrix).outerSize(); ++col_num) {
270 for (
decltype(matrix_shard)::InnerIterator it(matrix_shard, col_num); it;
272 max =
std::max(
max, std::abs(it.value() * row_scaling_vec[it.row()]));
274 shard(answer)[col_num] =
max * std::abs(col_scaling_shard[col_num]);
281 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
282 const VectorXd& row_scaling_vec,
const VectorXd& col_scaling_vec,
284 CHECK_EQ(matrix.cols(), col_scaling_vec.size());
285 CHECK_EQ(matrix.rows(), row_scaling_vec.size());
286 VectorXd answer(matrix.cols());
288 auto matrix_shard = shard(matrix);
289 auto col_scaling_shard = shard(col_scaling_vec);
290 for (int64_t col_num = 0; col_num < shard(matrix).outerSize(); ++col_num) {
291 double sum_of_squares = 0.0;
292 for (
decltype(matrix_shard)::InnerIterator it(matrix_shard, col_num); it;
297 shard(answer)[col_num] =
298 std::sqrt(sum_of_squares) * std::abs(col_scaling_shard[col_num]);
305 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
308 auto matrix_shard = shard(matrix);
310 for (int64_t col_idx = 0; col_idx < matrix_shard.outerSize(); ++col_idx) {
311 for (
decltype(matrix_shard)::InnerIterator it(matrix_shard, col_idx); it;
313 if (it.row() != (col_offset + it.col()))
return false;
#define CHECK_EQ(val1, val2)
#define CHECK_GE(val1, val2)
#define VLOG(verboselevel)
absl::Duration GetDuration() const
static T Square(const T x)
void Schedule(std::function< void()> closure)
Sharder(int64_t num_elements, int num_shards, ThreadPool *thread_pool, const std::function< int64_t(int64_t)> &element_mass)
double ParallelSumOverShards(const std::function< double(const Shard &)> &func) const
void ParallelForEachShard(const std::function< void(const Shard &)> &func) const
bool ParallelTrueForAllShards(const std::function< bool(const Shard &)> &func) const
int64_t ShardSize(int shard) const
int64_t ShardStart(int shard) const
int64_t ShardMass(int shard) const
double SquaredNorm(const VectorXd &vector, const Sharder &sharder)
double ScaledNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
double Dot(const VectorXd &v1, const VectorXd &v2, const Sharder &sharder)
double SquaredDistance(const VectorXd &vector1, const VectorXd &vector2, const Sharder &sharder)
double LInfNorm(const VectorXd &vector, const Sharder &sharder)
double Distance(const VectorXd &vector1, const VectorXd &vector2, const Sharder &sharder)
VectorXd TransposedMatrixVectorProduct(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &vector, const Sharder &sharder)
double ScaledLInfNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
double ScaledSquaredNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
VectorXd ScaledColLInfNorm(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &row_scaling_vec, const VectorXd &col_scaling_vec, const Sharder &sharder)
void AddScaledVector(const double scale, const VectorXd &increment, const Sharder &sharder, VectorXd &dest)
void CoefficientWiseProductInPlace(const VectorXd &scale, const Sharder &sharder, VectorXd &dest)
void CoefficientWiseQuotientInPlace(const VectorXd &scale, const Sharder &sharder, VectorXd &dest)
VectorXd ScaledColL2Norm(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &row_scaling_vec, const VectorXd &col_scaling_vec, const Sharder &sharder)
double L1Norm(const VectorXd &vector, const Sharder &sharder)
VectorXd CloneVector(const VectorXd &vec, const Sharder &sharder)
bool IsDiagonal(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const Sharder &sharder)
double Norm(const VectorXd &vector, const Sharder &sharder)
void AssignVector(const VectorXd &vec, const Sharder &sharder, VectorXd &dest)
#define VLOG_IS_ON(verboselevel)