BLI: add weighted parallel for function

The standard `threading::parallel_for` function tries to split the range into
uniformly sized subranges. This is great if each element takes approximately
the same amount of time to compute.

However, there are also situations where the time required to do the work for
a single index differs significantly between different indices. In such a case,
it's better to split the tasks into segments while taking the size of each task into
account.

This patch implements `threading::parallel_for_weighted` which allows passing
in an additional callback that returns the size of each task.

Pull Request: https://projects.blender.org/blender/blender/pulls/118348
This commit is contained in:
Jacques Lucke
2024-02-25 15:01:05 +01:00
parent 91895bf806
commit 9a3ceb79de
5 changed files with 159 additions and 59 deletions

View File

@@ -35,6 +35,7 @@
#include "BLI_function_ref.hh"
#include "BLI_index_range.hh"
#include "BLI_lazy_threading.hh"
#include "BLI_span.hh"
#include "BLI_utildefines.h"
namespace blender {
@@ -68,6 +69,10 @@ namespace detail {
void parallel_for_impl(IndexRange range,
int64_t grain_size,
FunctionRef<void(IndexRange)> function);
void parallel_for_weighted_impl(IndexRange range,
int64_t grain_size,
FunctionRef<void(IndexRange)> function,
FunctionRef<void(IndexRange, MutableSpan<int64_t>)> task_sizes_fn);
} // namespace detail
template<typename Function>
@@ -83,6 +88,40 @@ inline void parallel_for(IndexRange range, int64_t grain_size, const Function &f
detail::parallel_for_impl(range, grain_size, function);
}
/**
* Almost like `parallel_for` but allows passing in a function that estimates the amount of work
* per index. This allows distributing work to threads more evenly.
*
* Using this function makes sense when the work load for each index can differ significantly, so
* that it is impossible to determine a good constant grain size.
*
* This function has a bit more overhead than the unweighted #parallel_for. If that is noticable
* highly depends on the use-case. So the overhead should be measured when trying to use this
* function for cases where all tasks may be very small.
*
* \param task_size_fn: Gets the task index as input and computes that tasks size.
* \param grain_size: Determines approximately how large a combined task should be. For example, if
* the grain size is 100, then 5 tasks of size 20 fit into it.
*/
template<typename Function, typename TaskSizeFn>
inline void parallel_for_weighted(IndexRange range,
int64_t grain_size,
const Function &function,
const TaskSizeFn &task_size_fn)
{
if (range.is_empty()) {
return;
}
detail::parallel_for_weighted_impl(
range, grain_size, function, [&](const IndexRange sub_range, MutableSpan<int64_t> r_sizes) {
for (const int64_t i : sub_range.index_range()) {
const int64_t task_size = task_size_fn(sub_range[i]);
BLI_assert(task_size >= 0);
r_sizes[i] = task_size;
}
});
}
/**
* Move the sub-range boundaries down to the next aligned index. The "global" begin and end
* remain fixed though.

View File

@@ -12,10 +12,13 @@
#include "MEM_guardedalloc.h"
#include "BLI_array.hh"
#include "BLI_lazy_threading.hh"
#include "BLI_offset_indices.hh"
#include "BLI_task.h"
#include "BLI_task.hh"
#include "BLI_threads.h"
#include "BLI_vector.hh"
#include "atomic_ops.h"
@@ -179,4 +182,45 @@ void parallel_for_impl(const IndexRange range,
function(range);
}
void parallel_for_weighted_impl(
const IndexRange range,
const int64_t grain_size,
const FunctionRef<void(IndexRange)> function,
const FunctionRef<void(IndexRange, MutableSpan<int64_t>)> task_sizes_fn)
{
/* Shouldn't be too small, because then there is more overhead when the individual tasks are
* small. Also shouldn't be too large because then the serial code to split up tasks causes extra
* overhead. */
const int64_t outer_grain_size = std::min<int64_t>(grain_size, 512);
threading::parallel_for(range, outer_grain_size, [&](const IndexRange sub_range) {
/* Compute the size of every task in the current range. */
Array<int64_t, 1024> task_sizes(sub_range.size());
task_sizes_fn(sub_range, task_sizes);
/* Split range into multiple segments that have a size that approximates the grain size. */
Vector<int64_t, 256> offsets_vec;
offsets_vec.append(0);
int64_t counter = 0;
for (const int64_t i : sub_range.index_range()) {
counter += task_sizes[i];
if (counter >= grain_size) {
offsets_vec.append(i + 1);
counter = 0;
}
}
if (offsets_vec.last() < sub_range.size()) {
offsets_vec.append(sub_range.size());
}
const OffsetIndices<int64_t> offsets = offsets_vec.as_span();
/* Run the dynamically split tasks in parallel. */
threading::parallel_for(offsets.index_range(), 1, [&](const IndexRange offsets_range) {
for (const int64_t i : offsets_range) {
const IndexRange actual_range = offsets[i].shift(sub_range.start());
function(actual_range);
}
});
});
}
} // namespace blender::threading::detail

View File

@@ -123,16 +123,20 @@ class ProximityFunction : public mf::MultiFunction {
/* Construct BVH tree for each group. */
bvh_trees_.resize(groups_num);
threading::parallel_for(IndexRange(groups_num), 16, [&](const IndexRange range) {
for (const int group_i : range) {
const IndexMask &group_mask = group_masks[group_i];
if (group_mask.is_empty()) {
continue;
}
BVHTreeFromPointCloud &bvh = bvh_trees_[group_i].pointcloud_bvh;
BKE_bvhtree_from_pointcloud_get(pointcloud, group_mask, bvh);
}
});
threading::parallel_for_weighted(
IndexRange(groups_num),
512,
[&](const IndexRange range) {
for (const int group_i : range) {
const IndexMask &group_mask = group_masks[group_i];
if (group_mask.is_empty()) {
continue;
}
BVHTreeFromPointCloud &bvh = bvh_trees_[group_i].pointcloud_bvh;
BKE_bvhtree_from_pointcloud_get(pointcloud, group_mask, bvh);
}
},
[&](const int group_i) { return group_masks[group_i].size(); });
}
void init_for_mesh(const Mesh &mesh, const Field<int> &group_id_field)
@@ -152,29 +156,33 @@ class ProximityFunction : public mf::MultiFunction {
/* Construct BVH tree for each group. */
bvh_trees_.resize(groups_num);
threading::parallel_for(IndexRange(groups_num), 16, [&](const IndexRange range) {
for (const int group_i : range) {
const IndexMask &group_mask = group_masks[group_i];
if (group_mask.is_empty()) {
continue;
}
BVHTreeFromMesh &bvh = bvh_trees_[group_i].mesh_bvh;
switch (type_) {
case GEO_NODE_PROX_TARGET_POINTS: {
BKE_bvhtree_from_mesh_verts_init(mesh, group_mask, bvh);
break;
threading::parallel_for_weighted(
IndexRange(groups_num),
512,
[&](const IndexRange range) {
for (const int group_i : range) {
const IndexMask &group_mask = group_masks[group_i];
if (group_mask.is_empty()) {
continue;
}
BVHTreeFromMesh &bvh = bvh_trees_[group_i].mesh_bvh;
switch (type_) {
case GEO_NODE_PROX_TARGET_POINTS: {
BKE_bvhtree_from_mesh_verts_init(mesh, group_mask, bvh);
break;
}
case GEO_NODE_PROX_TARGET_EDGES: {
BKE_bvhtree_from_mesh_edges_init(mesh, group_mask, bvh);
break;
}
case GEO_NODE_PROX_TARGET_FACES: {
BKE_bvhtree_from_mesh_tris_init(mesh, group_mask, bvh);
break;
}
}
}
case GEO_NODE_PROX_TARGET_EDGES: {
BKE_bvhtree_from_mesh_edges_init(mesh, group_mask, bvh);
break;
}
case GEO_NODE_PROX_TARGET_FACES: {
BKE_bvhtree_from_mesh_tris_init(mesh, group_mask, bvh);
break;
}
}
}
});
},
[&](const int group_i) { return group_masks[group_i].size(); });
}
bke::AttrDomain get_domain_on_mesh() const

View File

@@ -116,13 +116,17 @@ class SampleNearestSurfaceFunction : public mf::MultiFunction {
/* Construct BVH tree for each group. */
bvh_trees_.reinitialize(groups_num);
threading::parallel_for(IndexRange(groups_num), 16, [&](const IndexRange range) {
for (const int group_i : range) {
const IndexMask &group_mask = group_masks[group_i];
BVHTreeFromMesh &bvh = bvh_trees_[group_i];
BKE_bvhtree_from_mesh_tris_init(mesh, group_mask, bvh);
}
});
threading::parallel_for_weighted(
IndexRange(groups_num),
512,
[&](const IndexRange range) {
for (const int group_i : range) {
const IndexMask &group_mask = group_masks[group_i];
BVHTreeFromMesh &bvh = bvh_trees_[group_i];
BKE_bvhtree_from_mesh_tris_init(mesh, group_mask, bvh);
}
},
[&](const int group_i) { return group_masks[group_i].size(); });
}
~SampleNearestSurfaceFunction()

View File

@@ -165,30 +165,35 @@ static void scale_vertex_islands_uniformly(Mesh &mesh,
const OffsetIndices faces = mesh.faces();
const Span<int> corner_verts = mesh.corner_verts();
threading::parallel_for(islands.index_range(), 256, [&](const IndexRange range) {
for (const int island_index : range) {
const ElementIsland &island = islands[island_index];
threading::parallel_for_weighted(
islands.index_range(),
512,
[&](const IndexRange range) {
for (const int island_index : range) {
const ElementIsland &island = islands[island_index];
float scale = 0.0f;
float3 center = {0.0f, 0.0f, 0.0f};
float scale = 0.0f;
float3 center = {0.0f, 0.0f, 0.0f};
VectorSet<int> vertex_indices;
for (const int face_index : island.element_indices) {
get_vertex_indices(edges, faces, corner_verts, face_index, vertex_indices);
center += params.centers[face_index];
scale += params.scales[face_index];
}
VectorSet<int> vertex_indices;
for (const int face_index : island.element_indices) {
get_vertex_indices(edges, faces, corner_verts, face_index, vertex_indices);
center += params.centers[face_index];
scale += params.scales[face_index];
}
/* Divide by number of elements to get the average. */
const float f = 1.0f / island.element_indices.size();
scale *= f;
center *= f;
/* Divide by number of elements to get the average. */
const float f = 1.0f / island.element_indices.size();
scale *= f;
center *= f;
for (const int vert_index : vertex_indices) {
positions[vert_index] = transform_with_uniform_scale(positions[vert_index], center, scale);
}
}
});
for (const int vert_index : vertex_indices) {
positions[vert_index] = transform_with_uniform_scale(
positions[vert_index], center, scale);
}
}
},
[&](const int64_t i) { return islands[i].element_indices.size(); });
mesh.tag_positions_changed();
}