BLI: generalize task size hints for parallel_for

This integrates the functionality for `parallel_for_weighted` from 9a3ceb79de
into `parallel_for`. This reduces the number of entry points to the threading
API and also makes it easier to build higher level threading primitives. For
example, `IndexMask.foreach_*` may use `parallel_for` if a `GrainSize` is
provided, but can't use `parallel_for_weighted` easily without duplicating a
fair amount of code.

The default behavior of `parallel_for` does not change. However, now one can
optionally pass in `TaskSizeHints` as the last parameter. This can be used to
specify the size of individual tasks relative to each other and relative to the
grain size. This helps scheduling more equally sized tasks which generally
improves performance because threads are used more effectively.

One generally does not construct `TaskSizeHints` manually, but calls either
`threading::individual_task_sizes` or `threading::accumulated_task_sizes`. Both
allow specifying individual task sizes, but the latter should be used when the
combined size of consecutive tasks can be computed in O(1) time. This allows
splitting up the work more efficiently. It can often be used in conjunction with
`OffsetIndices`.

Pull Request: https://projects.blender.org/blender/blender/pulls/121127
This commit is contained in:
Jacques Lucke
2024-04-29 23:55:22 +02:00
parent d588bfdb5e
commit 8d13a9608b
7 changed files with 300 additions and 73 deletions

View File

@@ -36,6 +36,7 @@
#include "BLI_index_range.hh"
#include "BLI_lazy_threading.hh"
#include "BLI_span.hh"
#include "BLI_task_size_hints.hh"
#include "BLI_utildefines.h"
namespace blender {
@@ -68,59 +69,43 @@ inline void parallel_for_each(Range &&range, const Function &function)
namespace detail {
void parallel_for_impl(IndexRange range,
int64_t grain_size,
FunctionRef<void(IndexRange)> function);
void parallel_for_weighted_impl(IndexRange range,
int64_t grain_size,
FunctionRef<void(IndexRange)> function,
FunctionRef<void(IndexRange, MutableSpan<int64_t>)> task_sizes_fn);
FunctionRef<void(IndexRange)> function,
const TaskSizeHints &size_hints);
void memory_bandwidth_bound_task_impl(FunctionRef<void()> function);
} // namespace detail
/**
* Executes the given function for sub-ranges of the given range, potentialy in parallel.
* This is the main primitive for parallelizing code.
*
* \param range: The indices that should be iterated over in parallel.
* \param grain_size: The approximate amount of work that should be scheduled at once.
* For example of the range is [0 - 1000] and the grain size is 200, then the function will be
* called 5 times with [0 - 200], [201 - 400], ... (approximately). The `size_hints` parameter
* can be used to adjust how the work is split up if the tasks have different sizes.
* \param function: A callback that actually does the work in parallel. It should have one
* #IndexRange parameter.
* \param size_hints: Can be used to specify the size of the tasks *relative to* each other and the
* grain size. If all tasks have approximately the same size, this can be ignored. Otherwise, one
* can use `threading::individual_task_sizes(...)` or `threading::accumulated_task_sizes(...)`.
* If the grain size is e.g. 200 and each task has the size 100, then only two tasks will be
* scheduled at once.
*/
template<typename Function>
inline void parallel_for(IndexRange range, int64_t grain_size, const Function &function)
inline void parallel_for(const IndexRange range,
const int64_t grain_size,
const Function &function,
const TaskSizeHints &size_hints = detail::TaskSizeHints_Static(1))
{
if (range.is_empty()) {
return;
}
if (range.size() <= grain_size) {
/* Invoking tbb for small workloads has a large overhead. */
if (use_single_thread(size_hints, range, grain_size)) {
function(range);
return;
}
detail::parallel_for_impl(range, grain_size, function);
}
/**
* Almost like `parallel_for` but allows passing in a function that estimates the amount of work
* per index. This allows distributing work to threads more evenly.
*
* Using this function makes sense when the work load for each index can differ significantly, so
* that it is impossible to determine a good constant grain size.
*
* This function has a bit more overhead than the unweighted #parallel_for. If that is noticeable
* highly depends on the use-case. So the overhead should be measured when trying to use this
* function for cases where all tasks may be very small.
*
* \param task_size_fn: Gets the task index as input and computes that tasks size.
* \param grain_size: Determines approximately how large a combined task should be. For example, if
* the grain size is 100, then 5 tasks of size 20 fit into it.
*/
template<typename Function, typename TaskSizeFn>
inline void parallel_for_weighted(IndexRange range,
int64_t grain_size,
const Function &function,
const TaskSizeFn &task_size_fn)
{
if (range.is_empty()) {
return;
}
detail::parallel_for_weighted_impl(
range, grain_size, function, [&](const IndexRange sub_range, MutableSpan<int64_t> r_sizes) {
for (const int64_t i : sub_range.index_range()) {
const int64_t task_size = task_size_fn(sub_range[i]);
BLI_assert(task_size >= 0);
r_sizes[i] = task_size;
}
});
detail::parallel_for_impl(range, grain_size, function, size_hints);
}
/**

View File

@@ -0,0 +1,176 @@
/* SPDX-FileCopyrightText: 2023 Blender Authors
*
* SPDX-License-Identifier: GPL-2.0-or-later */
#pragma once
/** \file
* \ingroup bli
*/
#include <optional>
#include "BLI_index_range.hh"
#include "BLI_span.hh"
#include "BLI_utildefines.h"
namespace blender::threading {
/**
* Specifies how large the individual tasks are relative to each other. It's common that all tasks
* have a very similar size in which case one can just ignore this. However, sometimes tasks have
* very different sizes and it makes sense for the scheduler to group fewer big tasks and many
* small tasks together.
*/
class TaskSizeHints {
public:
enum class Type {
/** All tasks have the same size. */
Static,
/** All tasks can have different sizes and one has to look up the sizes one by one. */
IndividualLookup,
/**
* All tasks can have different sizes but one can efficiently determine the size of a
* consecutive range of tasks.
*/
AccumulatedLookup,
};
Type type;
protected:
TaskSizeHints(const Type type) : type(type) {}
};
namespace detail {
class TaskSizeHints_Static : public TaskSizeHints {
public:
int64_t size;
TaskSizeHints_Static(const int64_t size) : TaskSizeHints(Type::Static), size(size) {}
};
class TaskSizeHints_IndividualLookup : public TaskSizeHints {
public:
std::optional<int64_t> full_size;
TaskSizeHints_IndividualLookup(std::optional<int64_t> full_size)
: TaskSizeHints(Type::IndividualLookup), full_size(full_size)
{
}
/** Get the individual size of all tasks in the range. */
virtual void lookup_individual_sizes(IndexRange /*range*/,
MutableSpan<int64_t> r_sizes) const = 0;
};
class TaskSizeHints_AccumulatedLookup : public TaskSizeHints {
public:
TaskSizeHints_AccumulatedLookup() : TaskSizeHints(Type::AccumulatedLookup) {}
/** Get the accumulated size of a range of tasks. */
virtual int64_t lookup_accumulated_size(IndexRange range) const = 0;
};
template<typename Fn>
class TaskSizeHints_IndividualLookupFn : public TaskSizeHints_IndividualLookup {
private:
Fn fn_;
public:
TaskSizeHints_IndividualLookupFn(Fn fn, const std::optional<int64_t> full_size)
: TaskSizeHints_IndividualLookup(full_size), fn_(std::move(fn))
{
}
void lookup_individual_sizes(const IndexRange range, MutableSpan<int64_t> r_sizes) const override
{
fn_(range, r_sizes);
}
};
template<typename Fn>
class TaskSizeHints_AccumulatedLookupFn : public TaskSizeHints_AccumulatedLookup {
private:
Fn fn_;
public:
TaskSizeHints_AccumulatedLookupFn(Fn fn) : TaskSizeHints_AccumulatedLookup(), fn_(std::move(fn))
{
}
int64_t lookup_accumulated_size(const IndexRange range) const override
{
return fn_(range);
}
};
} // namespace detail
inline bool use_single_thread(const TaskSizeHints &size_hints,
const IndexRange range,
const int64_t threshold)
{
switch (size_hints.type) {
case TaskSizeHints::Type::Static: {
const int64_t size = static_cast<const detail::TaskSizeHints_Static &>(size_hints).size;
return size * range.size() <= threshold;
}
case TaskSizeHints::Type::IndividualLookup: {
const std::optional<int64_t> &full_size =
static_cast<const detail::TaskSizeHints_IndividualLookup &>(size_hints).full_size;
if (full_size.has_value()) {
if (*full_size <= threshold) {
return true;
}
}
return false;
}
case TaskSizeHints::Type::AccumulatedLookup: {
const int64_t accumulated_size =
static_cast<const detail::TaskSizeHints_AccumulatedLookup &>(size_hints)
.lookup_accumulated_size(range);
return accumulated_size <= threshold;
}
}
BLI_assert_unreachable();
return true;
}
/**
* Specify how large the task at each index is with a callback. This is especially useful if the
* size of each individual task can be very different. Specifying the size allows the scheduler to
* distribute the work across threads more equally.
*
* \param fn: A function that returns the size for a single task: `(int64_t index) -> int64_t`.
* \param full_size: The (approximate) accumulated size of all tasks. This is optional and should
* only be passed in if it is trivially accessible already.
*/
template<typename Fn>
inline auto individual_task_sizes(Fn &&fn, const std::optional<int64_t> full_size = std::nullopt)
{
auto array_fn = [fn = std::forward<Fn>(fn)](const IndexRange range,
MutableSpan<int64_t> r_sizes) {
for (const int64_t i : range.index_range()) {
r_sizes[i] = fn(range[i]);
}
};
return detail::TaskSizeHints_IndividualLookupFn<decltype(array_fn)>(std::move(array_fn),
full_size);
}
/**
* Very similar to #individual_task_sizes, but should be used if one can very efficiently compute
* the accumulated task size (in O(1) time). This is often the case when e.g. working with
* #OffsetIndices.
*
* \param fn: A function that returns the accumulated size for a range of tasks:
* `(IndexRange indices) -> int64_t`.
*/
template<typename Fn> inline auto accumulated_task_sizes(Fn &&fn)
{
return detail::TaskSizeHints_AccumulatedLookupFn<decltype(fn)>(std::forward<Fn>(fn));
}
} // namespace blender::threading

View File

@@ -367,6 +367,7 @@ set(SRC
BLI_system.h
BLI_task.h
BLI_task.hh
BLI_task_size_hints.hh
BLI_tempfile.h
BLI_threads.h
BLI_time.h

View File

@@ -161,32 +161,21 @@ int BLI_task_parallel_thread_id(const TaskParallelTLS * /*tls*/)
namespace blender::threading::detail {
void parallel_for_impl(const IndexRange range,
const int64_t grain_size,
const FunctionRef<void(IndexRange)> function)
static void parallel_for_impl_static_size(const IndexRange range,
const int64_t grain_size,
const FunctionRef<void(IndexRange)> function)
{
#ifdef WITH_TBB
/* Invoking tbb for small workloads has a large overhead. */
if (range.size() >= grain_size) {
lazy_threading::send_hint();
tbb::parallel_for(
tbb::blocked_range<int64_t>(range.first(), range.one_after_last(), grain_size),
[function](const tbb::blocked_range<int64_t> &subrange) {
function(IndexRange(subrange.begin(), subrange.size()));
});
return;
}
#else
UNUSED_VARS(grain_size);
#endif
function(range);
tbb::parallel_for(tbb::blocked_range<int64_t>(range.first(), range.one_after_last(), grain_size),
[function](const tbb::blocked_range<int64_t> &subrange) {
function(IndexRange(subrange.begin(), subrange.size()));
});
}
void parallel_for_weighted_impl(
static void parallel_for_impl_individual_size_lookup(
const IndexRange range,
const int64_t grain_size,
const FunctionRef<void(IndexRange)> function,
const FunctionRef<void(IndexRange, MutableSpan<int64_t>)> task_sizes_fn)
const TaskSizeHints_IndividualLookup &size_hints)
{
/* Shouldn't be too small, because then there is more overhead when the individual tasks are
* small. Also shouldn't be too large because then the serial code to split up tasks causes extra
@@ -195,7 +184,7 @@ void parallel_for_weighted_impl(
threading::parallel_for(range, outer_grain_size, [&](const IndexRange sub_range) {
/* Compute the size of every task in the current range. */
Array<int64_t, 1024> task_sizes(sub_range.size());
task_sizes_fn(sub_range, task_sizes);
size_hints.lookup_individual_sizes(sub_range, task_sizes);
/* Split range into multiple segments that have a size that approximates the grain size. */
Vector<int64_t, 256> offsets_vec;
@@ -223,6 +212,75 @@ void parallel_for_weighted_impl(
});
}
static void parallel_for_impl_accumulated_size_lookup(
const IndexRange range,
const int64_t grain_size,
const FunctionRef<void(IndexRange)> function,
const TaskSizeHints_AccumulatedLookup &size_hints)
{
BLI_assert(!range.is_empty());
if (range.size() == 1) {
/* Can't subdivide further. */
function(range);
return;
}
const int64_t total_size = size_hints.lookup_accumulated_size(range);
if (total_size <= grain_size) {
function(range);
return;
}
const int64_t middle = range.size() / 2;
const IndexRange left_range = range.take_front(middle);
const IndexRange right_range = range.drop_front(middle);
threading::parallel_invoke(
[&]() {
parallel_for_impl_accumulated_size_lookup(left_range, grain_size, function, size_hints);
},
[&]() {
parallel_for_impl_accumulated_size_lookup(right_range, grain_size, function, size_hints);
});
}
void parallel_for_impl(const IndexRange range,
const int64_t grain_size,
const FunctionRef<void(IndexRange)> function,
const TaskSizeHints &size_hints)
{
#ifdef WITH_TBB
lazy_threading::send_hint();
switch (size_hints.type) {
case TaskSizeHints::Type::Static: {
const int64_t task_size = static_cast<const detail::TaskSizeHints_Static &>(size_hints).size;
const int64_t final_grain_size = task_size == 1 ?
grain_size :
std::max<int64_t>(1, grain_size / task_size);
parallel_for_impl_static_size(range, final_grain_size, function);
break;
}
case TaskSizeHints::Type::IndividualLookup: {
parallel_for_impl_individual_size_lookup(
range,
grain_size,
function,
static_cast<const detail::TaskSizeHints_IndividualLookup &>(size_hints));
break;
}
case TaskSizeHints::Type::AccumulatedLookup: {
parallel_for_impl_accumulated_size_lookup(
range,
grain_size,
function,
static_cast<const detail::TaskSizeHints_AccumulatedLookup &>(size_hints));
break;
}
}
#else
UNUSED_VARS(grain_size, size_hints);
function(range);
#endif
}
void memory_bandwidth_bound_task_impl(const FunctionRef<void()> function)
{
#ifdef WITH_TBB

View File

@@ -123,7 +123,7 @@ class ProximityFunction : public mf::MultiFunction {
/* Construct BVH tree for each group. */
bvh_trees_.resize(groups_num);
threading::parallel_for_weighted(
threading::parallel_for(
IndexRange(groups_num),
512,
[&](const IndexRange range) {
@@ -136,7 +136,8 @@ class ProximityFunction : public mf::MultiFunction {
BKE_bvhtree_from_pointcloud_get(pointcloud, group_mask, bvh);
}
},
[&](const int group_i) { return group_masks[group_i].size(); });
threading::individual_task_sizes(
[&](const int group_i) { return group_masks[group_i].size(); }, pointcloud.totpoint));
}
void init_for_mesh(const Mesh &mesh, const Field<int> &group_id_field)
@@ -156,7 +157,7 @@ class ProximityFunction : public mf::MultiFunction {
/* Construct BVH tree for each group. */
bvh_trees_.resize(groups_num);
threading::parallel_for_weighted(
threading::parallel_for(
IndexRange(groups_num),
512,
[&](const IndexRange range) {
@@ -182,7 +183,8 @@ class ProximityFunction : public mf::MultiFunction {
}
}
},
[&](const int group_i) { return group_masks[group_i].size(); });
threading::individual_task_sizes(
[&](const int group_i) { return group_masks[group_i].size(); }, domain_size));
}
bke::AttrDomain get_domain_on_mesh() const

View File

@@ -116,7 +116,7 @@ class SampleNearestSurfaceFunction : public mf::MultiFunction {
/* Construct BVH tree for each group. */
bvh_trees_.reinitialize(groups_num);
threading::parallel_for_weighted(
threading::parallel_for(
IndexRange(groups_num),
512,
[&](const IndexRange range) {
@@ -126,7 +126,8 @@ class SampleNearestSurfaceFunction : public mf::MultiFunction {
BKE_bvhtree_from_mesh_tris_init(mesh, group_mask, bvh);
}
},
[&](const int group_i) { return group_masks[group_i].size(); });
threading::individual_task_sizes(
[&](const int group_i) { return group_masks[group_i].size(); }, mesh.faces_num));
}
~SampleNearestSurfaceFunction()

View File

@@ -225,7 +225,7 @@ static void scale_uniformly(const GroupedSpan<int> elem_islands,
Mesh &mesh)
{
MutableSpan<float3> positions = mesh.vert_positions_for_write();
threading::parallel_for_weighted(
threading::parallel_for(
elem_islands.index_range(),
512,
[&](const IndexRange range) {
@@ -243,7 +243,9 @@ static void scale_uniformly(const GroupedSpan<int> elem_islands,
});
}
},
[&](const int64_t i) { return vert_islands[i].size(); });
threading::accumulated_task_sizes([&](const IndexRange range) {
return elem_islands.offsets[range].size() + vert_islands.offsets[range].size();
}));
}
static float4x4 create_single_axis_transform(const float3 &center,
@@ -294,7 +296,7 @@ static void scale_on_axis(const GroupedSpan<int> elem_islands,
Mesh &mesh)
{
MutableSpan<float3> positions = mesh.vert_positions_for_write();
threading::parallel_for_weighted(
threading::parallel_for(
elem_islands.index_range(),
512,
[&](const IndexRange range) {
@@ -315,7 +317,9 @@ static void scale_on_axis(const GroupedSpan<int> elem_islands,
});
}
},
[&](const int64_t i) { return vert_islands[i].size(); });
threading::accumulated_task_sizes([&](const IndexRange range) {
return vert_islands.offsets[range].size() + elem_islands.offsets[range].size();
}));
}
static int face_to_vert_islands(const Mesh &mesh,