Core: Add BLI_thread_queue priority and cancel support

Add functionality required for using BLI_thread_queue for shader compilation, as
discussed in #140214.

Pull Request: https://projects.blender.org/blender/blender/pulls/140992
This commit is contained in:
Miguel Pozo
2025-07-09 18:04:51 +02:00
parent 6d9ad29c2a
commit f46aa9a1ea
7 changed files with 198 additions and 43 deletions

View File

@@ -160,16 +160,75 @@ void BLI_condition_end(ThreadCondition *cond);
typedef struct ThreadQueue ThreadQueue;
typedef enum {
BLI_THREAD_QUEUE_WORK_PRIORITY_LOW,
BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL,
BLI_THREAD_QUEUE_WORK_PRIORITY_HIGH,
} ThreadQueueWorkPriority;
/**
* Allocate a new ThreadQueue.
*/
ThreadQueue *BLI_thread_queue_init(void);
/**
* Deallocate the ThreadQueue.
* Assumes no one is using the queue anymore.
*/
void BLI_thread_queue_free(ThreadQueue *queue);
void BLI_thread_queue_push(ThreadQueue *queue, void *work);
/**
* Push one work pointer to the queue.
* Higher priority works always take priority over lower priority ones, regardless of their
* insertion order. Works within the same priority follow FIFO order.
*
* \returns a unique work_id that can be used later for canceling the work before it's popped out
* from the queue.
*/
uint64_t BLI_thread_queue_push(ThreadQueue *queue, void *work, ThreadQueueWorkPriority priority);
/**
* Remove the corresponding work from the queue.
*/
void BLI_thread_queue_cancel_work(ThreadQueue *queue, uint64_t work_id);
/**
* Pop the oldest, highest priority work from the queue.
* Blocks the calling thread unless BLI_thread_queue_nowait has been called for this queue.
*
* \returns nullptr if nowait has been set and the queue is empty. Otherwise returns a work
* pointer.
*/
void *BLI_thread_queue_pop(ThreadQueue *queue);
/**
* Try to pop the oldest, highest priority work from the queue.
* Blocks the calling thread unless BLI_thread_queue_nowait has been called for this queue.
*
* \returns nullptr if time runs out, or if nowait has been set and the queue is empty. Otherwise
* returns a work pointer.
*/
void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms);
/**
* \returns the total amount of pending works still in the queue.
*/
int BLI_thread_queue_len(ThreadQueue *queue);
/**
* \returns true if there are no pending works in the queue.
*/
bool BLI_thread_queue_is_empty(ThreadQueue *queue);
/**
* Blocks the calling thread until the queue is empty.
*/
void BLI_thread_queue_wait_finish(ThreadQueue *queue);
/**
* After calling this function, BLI_thread_queue_pop and BLI_thread_queue_pop_timeout won't block
* the calling thread even when the queue is empty.
*/
void BLI_thread_queue_nowait(ThreadQueue *queue);
/* Thread local storage */

View File

@@ -172,8 +172,10 @@ struct TaskPool {
ThreadQueue *background_queue;
volatile bool background_is_canceling = false;
eTaskPriority priority;
TaskPool(const TaskPoolType type, const eTaskPriority priority, void *userdata)
: type(type), userdata(userdata)
: type(type), userdata(userdata), priority(priority)
{
this->use_threads = BLI_task_scheduler_num_threads() > 1 && type != TASK_POOL_NO_THREADS;
@@ -196,8 +198,6 @@ struct TaskPool {
if (use_threads) {
this->tbb_group = std::make_unique<TBBTaskGroup>(priority);
}
#else
UNUSED_VARS(priority);
#endif
break;
}
@@ -423,7 +423,11 @@ void TaskPool::background_task_pool_run(Task &&task)
BLI_assert(ELEM(this->type, TASK_POOL_BACKGROUND, TASK_POOL_BACKGROUND_SERIAL));
Task *task_mem = MEM_new<Task>(__func__, std::move(task));
BLI_thread_queue_push(this->background_queue, task_mem);
BLI_thread_queue_push(this->background_queue,
task_mem,
this->priority == TASK_PRIORITY_HIGH ?
BLI_THREAD_QUEUE_WORK_PRIORITY_HIGH :
BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL);
if (BLI_available_threads(&this->background_threads)) {
BLI_threadpool_insert(&this->background_threads, this);

View File

@@ -9,6 +9,7 @@
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <deque>
#include "MEM_guardedalloc.h"
@@ -604,21 +605,26 @@ void BLI_condition_end(ThreadCondition *cond)
/* ************************************************ */
struct ThreadQueueWork {
void *work;
uint64_t id;
};
struct ThreadQueue {
GSQueue *queue;
uint64_t current_id = 0;
std::deque<ThreadQueueWork> queue_low_priority;
std::deque<ThreadQueueWork> queue_normal_priority;
std::deque<ThreadQueueWork> queue_high_priority;
pthread_mutex_t mutex;
pthread_cond_t push_cond;
pthread_cond_t finish_cond;
volatile int nowait;
volatile int canceled;
volatile int nowait = 0;
volatile int canceled = 0;
};
ThreadQueue *BLI_thread_queue_init()
{
ThreadQueue *queue;
queue = MEM_callocN<ThreadQueue>("ThreadQueue");
queue->queue = BLI_gsqueue_new(sizeof(void *));
ThreadQueue *queue = MEM_new<ThreadQueue>(__func__);
pthread_mutex_init(&queue->mutex, nullptr);
pthread_cond_init(&queue->push_cond, nullptr);
@@ -634,44 +640,109 @@ void BLI_thread_queue_free(ThreadQueue *queue)
pthread_cond_destroy(&queue->push_cond);
pthread_mutex_destroy(&queue->mutex);
BLI_gsqueue_free(queue->queue);
MEM_freeN(queue);
MEM_delete(queue);
}
void BLI_thread_queue_push(ThreadQueue *queue, void *work)
uint64_t BLI_thread_queue_push(ThreadQueue *queue, void *work, ThreadQueueWorkPriority priority)
{
BLI_assert(work);
pthread_mutex_lock(&queue->mutex);
BLI_gsqueue_push(queue->queue, &work);
ThreadQueueWork work_reference;
work_reference.work = work;
work_reference.id = ++queue->current_id;
switch (priority) {
case BLI_THREAD_QUEUE_WORK_PRIORITY_LOW:
queue->queue_low_priority.push_back(work_reference);
break;
case BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL:
queue->queue_normal_priority.push_back(work_reference);
break;
case BLI_THREAD_QUEUE_WORK_PRIORITY_HIGH:
queue->queue_high_priority.push_back(work_reference);
break;
}
/* signal threads waiting to pop */
pthread_cond_signal(&queue->push_cond);
pthread_mutex_unlock(&queue->mutex);
return work_reference.id;
}
/** WARNING: Assumes the queue is already locked. */
static void check_finalization(ThreadQueue *queue)
{
if (queue->queue_low_priority.empty() && queue->queue_normal_priority.empty() &&
queue->queue_high_priority.empty())
{
pthread_cond_signal(&queue->finish_cond);
}
}
void BLI_thread_queue_cancel_work(ThreadQueue *queue, uint64_t work_id)
{
pthread_mutex_lock(&queue->mutex);
bool found = false;
auto check = [&](const ThreadQueueWork &work) {
if (work.id == work_id) {
found = true;
return true;
}
return false;
};
auto cancel = [&](std::deque<ThreadQueueWork> &sub_queue) {
sub_queue.erase(std::remove_if(sub_queue.begin(), sub_queue.end(), check), sub_queue.end());
};
cancel(queue->queue_low_priority);
cancel(queue->queue_normal_priority);
cancel(queue->queue_high_priority);
if (found) {
check_finalization(queue);
}
pthread_mutex_unlock(&queue->mutex);
}
void *BLI_thread_queue_pop(ThreadQueue *queue)
{
void *work = nullptr;
ThreadQueueWork work_reference = {0};
/* wait until there is work */
pthread_mutex_lock(&queue->mutex);
while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
while (!queue->nowait && queue->queue_low_priority.empty() &&
queue->queue_normal_priority.empty() && queue->queue_high_priority.empty())
{
pthread_cond_wait(&queue->push_cond, &queue->mutex);
}
/* if we have something, pop it */
if (!BLI_gsqueue_is_empty(queue->queue)) {
BLI_gsqueue_pop(queue->queue, &work);
if (BLI_gsqueue_is_empty(queue->queue)) {
pthread_cond_broadcast(&queue->finish_cond);
for (std::deque<ThreadQueueWork> *sub_queue :
{&queue->queue_high_priority, &queue->queue_normal_priority, &queue->queue_low_priority})
{
if (sub_queue->empty()) {
continue;
}
work_reference = sub_queue->front();
sub_queue->pop_front();
/* Don't pop more than one work. */
break;
}
if (work_reference.work) {
check_finalization(queue);
}
pthread_mutex_unlock(&queue->mutex);
return work;
return work_reference.work;
}
static void wait_timeout(timespec *timeout, int ms)
@@ -712,7 +783,7 @@ static void wait_timeout(timespec *timeout, int ms)
void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
{
double t;
void *work = nullptr;
ThreadQueueWork work_reference = {0};
timespec timeout;
t = BLI_time_now_seconds();
@@ -720,7 +791,9 @@ void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
/* wait until there is work */
pthread_mutex_lock(&queue->mutex);
while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
while (!queue->nowait && queue->queue_low_priority.empty() &&
queue->queue_normal_priority.empty() && queue->queue_high_priority.empty())
{
if (pthread_cond_timedwait(&queue->push_cond, &queue->mutex, &timeout) == ETIMEDOUT) {
break;
}
@@ -730,17 +803,26 @@ void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
}
/* if we have something, pop it */
if (!BLI_gsqueue_is_empty(queue->queue)) {
BLI_gsqueue_pop(queue->queue, &work);
if (BLI_gsqueue_is_empty(queue->queue)) {
pthread_cond_broadcast(&queue->finish_cond);
for (std::deque<ThreadQueueWork> *sub_queue :
{&queue->queue_high_priority, &queue->queue_normal_priority, &queue->queue_low_priority})
{
if (sub_queue->empty()) {
continue;
}
work_reference = sub_queue->front();
sub_queue->pop_front();
/* Don't pop more than one work. */
break;
}
if (work_reference.work) {
check_finalization(queue);
}
pthread_mutex_unlock(&queue->mutex);
return work;
return work_reference.work;
}
int BLI_thread_queue_len(ThreadQueue *queue)
@@ -748,7 +830,8 @@ int BLI_thread_queue_len(ThreadQueue *queue)
int size;
pthread_mutex_lock(&queue->mutex);
size = BLI_gsqueue_len(queue->queue);
size = queue->queue_low_priority.size() + queue->queue_normal_priority.size() +
queue->queue_high_priority.size();
pthread_mutex_unlock(&queue->mutex);
return size;
@@ -759,7 +842,8 @@ bool BLI_thread_queue_is_empty(ThreadQueue *queue)
bool is_empty;
pthread_mutex_lock(&queue->mutex);
is_empty = BLI_gsqueue_is_empty(queue->queue);
is_empty = queue->queue_low_priority.empty() && queue->queue_normal_priority.empty() &&
queue->queue_high_priority.empty();
pthread_mutex_unlock(&queue->mutex);
return is_empty;
@@ -781,7 +865,9 @@ void BLI_thread_queue_wait_finish(ThreadQueue *queue)
/* wait for finish condition */
pthread_mutex_lock(&queue->mutex);
while (!BLI_gsqueue_is_empty(queue->queue)) {
while (!queue->queue_low_priority.empty() || !queue->queue_normal_priority.empty() ||
!queue->queue_high_priority.empty())
{
pthread_cond_wait(&queue->finish_cond, &queue->mutex);
}

View File

@@ -1832,7 +1832,8 @@ void PreviewLoadJob::push_load_request(PreviewImage *preview, const eIconSizes i
preview->runtime->tag |= PRV_TAG_DEFFERED_RENDERING;
requested_previews_.emplace_back(preview, icon_size);
BLI_thread_queue_push(todo_queue_, &requested_previews_.back());
BLI_thread_queue_push(
todo_queue_, &requested_previews_.back(), BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL);
}
void PreviewLoadJob::run_fn(void *customdata, wmJobWorkerStatus *worker_status)

View File

@@ -1563,7 +1563,7 @@ static void filelist_cache_preview_runf(TaskPool *__restrict pool, void *taskdat
/* Move ownership to the done queue. */
preview_taskdata->preview = nullptr;
BLI_thread_queue_push(cache->previews_done, preview);
BLI_thread_queue_push(cache->previews_done, preview, BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL);
// printf("%s: End (%d)...\n", __func__, threadid);
}
@@ -1713,7 +1713,7 @@ static bool filelist_cache_previews_push(FileList *filelist, FileDirEntry *entry
if (imbuf) {
preview->icon_id = BKE_icon_imbuf_create(imbuf);
}
BLI_thread_queue_push(cache->previews_done, preview);
BLI_thread_queue_push(cache->previews_done, preview, BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL);
}
else {
if (entry->redirection_path) {

View File

@@ -545,7 +545,8 @@ void VKDevice::context_unregister(VKContext &context)
BLI_assert_msg(render_graph.is_empty(),
"Unregistering a context that still has an unsubmitted render graph.");
render_graph.reset();
BLI_thread_queue_push(unused_render_graphs_, &render_graph);
BLI_thread_queue_push(
unused_render_graphs_, &render_graph, BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL);
}
{
std::scoped_lock lock(orphaned_data.mutex_get());

View File

@@ -53,7 +53,8 @@ TimelineValue VKDevice::render_graph_submit(render_graph::VKRenderGraph *render_
{
if (render_graph->is_empty()) {
render_graph->reset();
BLI_thread_queue_push(unused_render_graphs_, render_graph);
BLI_thread_queue_push(
unused_render_graphs_, render_graph, BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL);
return 0;
}
@@ -79,7 +80,8 @@ TimelineValue VKDevice::render_graph_submit(render_graph::VKRenderGraph *render_
timeline = submit_task->timeline = submit_to_device ? ++timeline_value_ : timeline_value_ + 1;
orphaned_data.timeline_ = timeline;
orphaned_data.move_data(context_discard_pool, timeline);
BLI_thread_queue_push(submitted_render_graphs_, submit_task);
BLI_thread_queue_push(
submitted_render_graphs_, submit_task, BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL);
}
submit_task = nullptr;
@@ -270,7 +272,9 @@ void VKDevice::submission_runner(TaskPool *__restrict pool, void *task_data)
}
render_graph.reset();
BLI_thread_queue_push(device->unused_render_graphs_, std::move(submit_task->render_graph));
BLI_thread_queue_push(device->unused_render_graphs_,
std::move(submit_task->render_graph),
BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL);
MEM_delete<VKRenderGraphSubmitTask>(submit_task);
}
CLOG_INFO(&LOG, 3, "submission runner is being canceled");