diff --git a/source/blender/blenlib/BLI_threads.h b/source/blender/blenlib/BLI_threads.h index b6dc6681441..d91af85c64c 100644 --- a/source/blender/blenlib/BLI_threads.h +++ b/source/blender/blenlib/BLI_threads.h @@ -160,16 +160,75 @@ void BLI_condition_end(ThreadCondition *cond); typedef struct ThreadQueue ThreadQueue; +typedef enum { + BLI_THREAD_QUEUE_WORK_PRIORITY_LOW, + BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL, + BLI_THREAD_QUEUE_WORK_PRIORITY_HIGH, +} ThreadQueueWorkPriority; + +/** + * Allocate a new ThreadQueue. + */ ThreadQueue *BLI_thread_queue_init(void); + +/** + * Deallocate the ThreadQueue. + * Assumes no one is using the queue anymore. + */ void BLI_thread_queue_free(ThreadQueue *queue); -void BLI_thread_queue_push(ThreadQueue *queue, void *work); +/** + * Push one work pointer to the queue. + * Higher priority works always take priority over lower priority ones, regardless of their + * insertion order. Works within the same priority follow FIFO order. + * + * \returns a unique work_id that can be used later for canceling the work before it's popped out + * from the queue. + */ +uint64_t BLI_thread_queue_push(ThreadQueue *queue, void *work, ThreadQueueWorkPriority priority); + +/** + * Remove the corresponding work from the queue. + */ +void BLI_thread_queue_cancel_work(ThreadQueue *queue, uint64_t work_id); + +/** + * Pop the oldest, highest priority work from the queue. + * Blocks the calling thread unless BLI_thread_queue_nowait has been called for this queue. + * + * \returns nullptr if nowait has been set and the queue is empty. Otherwise returns a work + * pointer. + */ void *BLI_thread_queue_pop(ThreadQueue *queue); + +/** + * Try to pop the oldest, highest priority work from the queue. + * Blocks the calling thread unless BLI_thread_queue_nowait has been called for this queue. + * + * \returns nullptr if time runs out, or if nowait has been set and the queue is empty. Otherwise + * returns a work pointer. + */ void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms); + +/** + * \returns the total amount of pending works still in the queue. + */ int BLI_thread_queue_len(ThreadQueue *queue); + +/** + * \returns true if there are no pending works in the queue. + */ bool BLI_thread_queue_is_empty(ThreadQueue *queue); +/** + * Blocks the calling thread until the queue is empty. + */ void BLI_thread_queue_wait_finish(ThreadQueue *queue); + +/** + * After calling this function, BLI_thread_queue_pop and BLI_thread_queue_pop_timeout won't block + * the calling thread even when the queue is empty. + */ void BLI_thread_queue_nowait(ThreadQueue *queue); /* Thread local storage */ diff --git a/source/blender/blenlib/intern/task_pool.cc b/source/blender/blenlib/intern/task_pool.cc index c45188cafe5..72a7d1edde2 100644 --- a/source/blender/blenlib/intern/task_pool.cc +++ b/source/blender/blenlib/intern/task_pool.cc @@ -172,8 +172,10 @@ struct TaskPool { ThreadQueue *background_queue; volatile bool background_is_canceling = false; + eTaskPriority priority; + TaskPool(const TaskPoolType type, const eTaskPriority priority, void *userdata) - : type(type), userdata(userdata) + : type(type), userdata(userdata), priority(priority) { this->use_threads = BLI_task_scheduler_num_threads() > 1 && type != TASK_POOL_NO_THREADS; @@ -196,8 +198,6 @@ struct TaskPool { if (use_threads) { this->tbb_group = std::make_unique(priority); } -#else - UNUSED_VARS(priority); #endif break; } @@ -423,7 +423,11 @@ void TaskPool::background_task_pool_run(Task &&task) BLI_assert(ELEM(this->type, TASK_POOL_BACKGROUND, TASK_POOL_BACKGROUND_SERIAL)); Task *task_mem = MEM_new(__func__, std::move(task)); - BLI_thread_queue_push(this->background_queue, task_mem); + BLI_thread_queue_push(this->background_queue, + task_mem, + this->priority == TASK_PRIORITY_HIGH ? + BLI_THREAD_QUEUE_WORK_PRIORITY_HIGH : + BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL); if (BLI_available_threads(&this->background_threads)) { BLI_threadpool_insert(&this->background_threads, this); diff --git a/source/blender/blenlib/intern/threads.cc b/source/blender/blenlib/intern/threads.cc index 3797eb573bb..f15ab16a747 100644 --- a/source/blender/blenlib/intern/threads.cc +++ b/source/blender/blenlib/intern/threads.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "MEM_guardedalloc.h" @@ -604,21 +605,26 @@ void BLI_condition_end(ThreadCondition *cond) /* ************************************************ */ +struct ThreadQueueWork { + void *work; + uint64_t id; +}; + struct ThreadQueue { - GSQueue *queue; + uint64_t current_id = 0; + std::deque queue_low_priority; + std::deque queue_normal_priority; + std::deque queue_high_priority; pthread_mutex_t mutex; pthread_cond_t push_cond; pthread_cond_t finish_cond; - volatile int nowait; - volatile int canceled; + volatile int nowait = 0; + volatile int canceled = 0; }; ThreadQueue *BLI_thread_queue_init() { - ThreadQueue *queue; - - queue = MEM_callocN("ThreadQueue"); - queue->queue = BLI_gsqueue_new(sizeof(void *)); + ThreadQueue *queue = MEM_new(__func__); pthread_mutex_init(&queue->mutex, nullptr); pthread_cond_init(&queue->push_cond, nullptr); @@ -634,44 +640,109 @@ void BLI_thread_queue_free(ThreadQueue *queue) pthread_cond_destroy(&queue->push_cond); pthread_mutex_destroy(&queue->mutex); - BLI_gsqueue_free(queue->queue); - - MEM_freeN(queue); + MEM_delete(queue); } -void BLI_thread_queue_push(ThreadQueue *queue, void *work) +uint64_t BLI_thread_queue_push(ThreadQueue *queue, void *work, ThreadQueueWorkPriority priority) { + BLI_assert(work); + pthread_mutex_lock(&queue->mutex); - BLI_gsqueue_push(queue->queue, &work); + ThreadQueueWork work_reference; + work_reference.work = work; + work_reference.id = ++queue->current_id; + + switch (priority) { + case BLI_THREAD_QUEUE_WORK_PRIORITY_LOW: + queue->queue_low_priority.push_back(work_reference); + break; + case BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL: + queue->queue_normal_priority.push_back(work_reference); + break; + case BLI_THREAD_QUEUE_WORK_PRIORITY_HIGH: + queue->queue_high_priority.push_back(work_reference); + break; + } /* signal threads waiting to pop */ pthread_cond_signal(&queue->push_cond); pthread_mutex_unlock(&queue->mutex); + + return work_reference.id; +} + +/** WARNING: Assumes the queue is already locked. */ +static void check_finalization(ThreadQueue *queue) +{ + if (queue->queue_low_priority.empty() && queue->queue_normal_priority.empty() && + queue->queue_high_priority.empty()) + { + pthread_cond_signal(&queue->finish_cond); + } +} + +void BLI_thread_queue_cancel_work(ThreadQueue *queue, uint64_t work_id) +{ + pthread_mutex_lock(&queue->mutex); + + bool found = false; + auto check = [&](const ThreadQueueWork &work) { + if (work.id == work_id) { + found = true; + return true; + } + return false; + }; + + auto cancel = [&](std::deque &sub_queue) { + sub_queue.erase(std::remove_if(sub_queue.begin(), sub_queue.end(), check), sub_queue.end()); + }; + + cancel(queue->queue_low_priority); + cancel(queue->queue_normal_priority); + cancel(queue->queue_high_priority); + + if (found) { + check_finalization(queue); + } + + pthread_mutex_unlock(&queue->mutex); } void *BLI_thread_queue_pop(ThreadQueue *queue) { - void *work = nullptr; + ThreadQueueWork work_reference = {0}; /* wait until there is work */ pthread_mutex_lock(&queue->mutex); - while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) { + while (!queue->nowait && queue->queue_low_priority.empty() && + queue->queue_normal_priority.empty() && queue->queue_high_priority.empty()) + { pthread_cond_wait(&queue->push_cond, &queue->mutex); } /* if we have something, pop it */ - if (!BLI_gsqueue_is_empty(queue->queue)) { - BLI_gsqueue_pop(queue->queue, &work); - - if (BLI_gsqueue_is_empty(queue->queue)) { - pthread_cond_broadcast(&queue->finish_cond); + for (std::deque *sub_queue : + {&queue->queue_high_priority, &queue->queue_normal_priority, &queue->queue_low_priority}) + { + if (sub_queue->empty()) { + continue; } + work_reference = sub_queue->front(); + sub_queue->pop_front(); + + /* Don't pop more than one work. */ + break; + } + + if (work_reference.work) { + check_finalization(queue); } pthread_mutex_unlock(&queue->mutex); - return work; + return work_reference.work; } static void wait_timeout(timespec *timeout, int ms) @@ -712,7 +783,7 @@ static void wait_timeout(timespec *timeout, int ms) void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms) { double t; - void *work = nullptr; + ThreadQueueWork work_reference = {0}; timespec timeout; t = BLI_time_now_seconds(); @@ -720,7 +791,9 @@ void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms) /* wait until there is work */ pthread_mutex_lock(&queue->mutex); - while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) { + while (!queue->nowait && queue->queue_low_priority.empty() && + queue->queue_normal_priority.empty() && queue->queue_high_priority.empty()) + { if (pthread_cond_timedwait(&queue->push_cond, &queue->mutex, &timeout) == ETIMEDOUT) { break; } @@ -730,17 +803,26 @@ void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms) } /* if we have something, pop it */ - if (!BLI_gsqueue_is_empty(queue->queue)) { - BLI_gsqueue_pop(queue->queue, &work); - - if (BLI_gsqueue_is_empty(queue->queue)) { - pthread_cond_broadcast(&queue->finish_cond); + for (std::deque *sub_queue : + {&queue->queue_high_priority, &queue->queue_normal_priority, &queue->queue_low_priority}) + { + if (sub_queue->empty()) { + continue; } + work_reference = sub_queue->front(); + sub_queue->pop_front(); + + /* Don't pop more than one work. */ + break; + } + + if (work_reference.work) { + check_finalization(queue); } pthread_mutex_unlock(&queue->mutex); - return work; + return work_reference.work; } int BLI_thread_queue_len(ThreadQueue *queue) @@ -748,7 +830,8 @@ int BLI_thread_queue_len(ThreadQueue *queue) int size; pthread_mutex_lock(&queue->mutex); - size = BLI_gsqueue_len(queue->queue); + size = queue->queue_low_priority.size() + queue->queue_normal_priority.size() + + queue->queue_high_priority.size(); pthread_mutex_unlock(&queue->mutex); return size; @@ -759,7 +842,8 @@ bool BLI_thread_queue_is_empty(ThreadQueue *queue) bool is_empty; pthread_mutex_lock(&queue->mutex); - is_empty = BLI_gsqueue_is_empty(queue->queue); + is_empty = queue->queue_low_priority.empty() && queue->queue_normal_priority.empty() && + queue->queue_high_priority.empty(); pthread_mutex_unlock(&queue->mutex); return is_empty; @@ -781,7 +865,9 @@ void BLI_thread_queue_wait_finish(ThreadQueue *queue) /* wait for finish condition */ pthread_mutex_lock(&queue->mutex); - while (!BLI_gsqueue_is_empty(queue->queue)) { + while (!queue->queue_low_priority.empty() || !queue->queue_normal_priority.empty() || + !queue->queue_high_priority.empty()) + { pthread_cond_wait(&queue->finish_cond, &queue->mutex); } diff --git a/source/blender/editors/render/render_preview.cc b/source/blender/editors/render/render_preview.cc index ac522b55e9c..812909fc32c 100644 --- a/source/blender/editors/render/render_preview.cc +++ b/source/blender/editors/render/render_preview.cc @@ -1832,7 +1832,8 @@ void PreviewLoadJob::push_load_request(PreviewImage *preview, const eIconSizes i preview->runtime->tag |= PRV_TAG_DEFFERED_RENDERING; requested_previews_.emplace_back(preview, icon_size); - BLI_thread_queue_push(todo_queue_, &requested_previews_.back()); + BLI_thread_queue_push( + todo_queue_, &requested_previews_.back(), BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL); } void PreviewLoadJob::run_fn(void *customdata, wmJobWorkerStatus *worker_status) diff --git a/source/blender/editors/space_file/filelist.cc b/source/blender/editors/space_file/filelist.cc index 20a9e20c49f..1eb4945d956 100644 --- a/source/blender/editors/space_file/filelist.cc +++ b/source/blender/editors/space_file/filelist.cc @@ -1563,7 +1563,7 @@ static void filelist_cache_preview_runf(TaskPool *__restrict pool, void *taskdat /* Move ownership to the done queue. */ preview_taskdata->preview = nullptr; - BLI_thread_queue_push(cache->previews_done, preview); + BLI_thread_queue_push(cache->previews_done, preview, BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL); // printf("%s: End (%d)...\n", __func__, threadid); } @@ -1713,7 +1713,7 @@ static bool filelist_cache_previews_push(FileList *filelist, FileDirEntry *entry if (imbuf) { preview->icon_id = BKE_icon_imbuf_create(imbuf); } - BLI_thread_queue_push(cache->previews_done, preview); + BLI_thread_queue_push(cache->previews_done, preview, BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL); } else { if (entry->redirection_path) { diff --git a/source/blender/gpu/vulkan/vk_device.cc b/source/blender/gpu/vulkan/vk_device.cc index 42c0448ca88..c347ca2f8ca 100644 --- a/source/blender/gpu/vulkan/vk_device.cc +++ b/source/blender/gpu/vulkan/vk_device.cc @@ -545,7 +545,8 @@ void VKDevice::context_unregister(VKContext &context) BLI_assert_msg(render_graph.is_empty(), "Unregistering a context that still has an unsubmitted render graph."); render_graph.reset(); - BLI_thread_queue_push(unused_render_graphs_, &render_graph); + BLI_thread_queue_push( + unused_render_graphs_, &render_graph, BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL); } { std::scoped_lock lock(orphaned_data.mutex_get()); diff --git a/source/blender/gpu/vulkan/vk_device_submission.cc b/source/blender/gpu/vulkan/vk_device_submission.cc index cb7036a82d8..faa950ac6e3 100644 --- a/source/blender/gpu/vulkan/vk_device_submission.cc +++ b/source/blender/gpu/vulkan/vk_device_submission.cc @@ -53,7 +53,8 @@ TimelineValue VKDevice::render_graph_submit(render_graph::VKRenderGraph *render_ { if (render_graph->is_empty()) { render_graph->reset(); - BLI_thread_queue_push(unused_render_graphs_, render_graph); + BLI_thread_queue_push( + unused_render_graphs_, render_graph, BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL); return 0; } @@ -79,7 +80,8 @@ TimelineValue VKDevice::render_graph_submit(render_graph::VKRenderGraph *render_ timeline = submit_task->timeline = submit_to_device ? ++timeline_value_ : timeline_value_ + 1; orphaned_data.timeline_ = timeline; orphaned_data.move_data(context_discard_pool, timeline); - BLI_thread_queue_push(submitted_render_graphs_, submit_task); + BLI_thread_queue_push( + submitted_render_graphs_, submit_task, BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL); } submit_task = nullptr; @@ -270,7 +272,9 @@ void VKDevice::submission_runner(TaskPool *__restrict pool, void *task_data) } render_graph.reset(); - BLI_thread_queue_push(device->unused_render_graphs_, std::move(submit_task->render_graph)); + BLI_thread_queue_push(device->unused_render_graphs_, + std::move(submit_task->render_graph), + BLI_THREAD_QUEUE_WORK_PRIORITY_NORMAL); MEM_delete(submit_task); } CLOG_INFO(&LOG, 3, "submission runner is being canceled");