ThreadedWorker

New functions to easily dispatch work to a limited number of thread, transparently.

NOTE: Could be merged in trunk, if needed.
This commit is contained in:
Martin Poirier
2008-08-16 22:47:33 +00:00
parent cc3b41b3cd
commit e0722e0923
2 changed files with 125 additions and 1 deletions

View File

@@ -39,7 +39,6 @@
#define BLENDER_MAX_THREADS 8
struct ListBase;
void BLI_init_threads (struct ListBase *threadbase, void *(*do_thread)(void *), int tot);
int BLI_available_threads(struct ListBase *threadbase);
int BLI_available_thread_index(struct ListBase *threadbase);
@@ -52,5 +51,28 @@ void BLI_lock_thread (int type);
void BLI_unlock_thread (int type);
int BLI_system_thread_count( void ); /* gets the number of threads the system can make use of */
/* ThreadedWorker is a simple tool for dispatching work to a limited number of threads in a transparent
* fashion from the caller's perspective
* */
struct ThreadedWorker;
/* Create a new worker supporting tot parallel threads.
* When new work in inserted and all threads are busy, sleep(sleep_time) before checking again
*/
struct ThreadedWorker *BLI_create_worker(void *(*do_thread)(void *), int tot, int sleep_time);
/* join all working threads */
void BLI_end_worker(struct ThreadedWorker *worker);
/* also ends all working threads */
void BLI_destroy_worker(struct ThreadedWorker *worker);
/* Spawns a new work thread if possible, sleeps until one is available otherwise
* NOTE: inserting work is NOT thread safe, so make sure it is only done from one thread */
void BLI_insert_work(struct ThreadedWorker *worker, void *param);
#endif

View File

@@ -38,6 +38,8 @@
#include "BLI_blenlib.h"
#include "BLI_threads.h"
#include "PIL_time.h"
/* for checking system threads - BLI_system_thread_count */
#ifdef WIN32
#include "Windows.h"
@@ -280,4 +282,104 @@ int BLI_system_thread_count( void )
return t;
}
/* ************************************************ */
typedef struct ThreadedWorker {
ListBase threadbase;
void *(*work_fnct)(void *);
char busy[RE_MAX_THREAD];
int total;
int sleep_time;
} ThreadedWorker;
typedef struct WorkParam {
ThreadedWorker *worker;
void *param;
int index;
} WorkParam;
void *exec_work_fnct(void *v_param)
{
WorkParam *p = (WorkParam*)v_param;
void *value;
value = p->worker->work_fnct(p->param);
p->worker->busy[p->index] = 0;
MEM_freeN(p);
return value;
}
ThreadedWorker *BLI_create_worker(void *(*do_thread)(void *), int tot, int sleep_time)
{
ThreadedWorker *worker;
worker = MEM_callocN(sizeof(ThreadedWorker), "threadedworker");
if (tot > RE_MAX_THREAD)
{
tot = RE_MAX_THREAD;
}
else if (tot < 1)
{
tot= 1;
}
worker->total = tot;
worker->work_fnct = do_thread;
BLI_init_threads(&worker->threadbase, exec_work_fnct, tot);
return worker;
}
void BLI_end_worker(ThreadedWorker *worker)
{
BLI_end_threads(&worker->threadbase);
}
void BLI_destroy_worker(ThreadedWorker *worker)
{
BLI_end_worker(worker);
BLI_freelistN(&worker->threadbase);
MEM_freeN(worker);
}
void BLI_insert_work(ThreadedWorker *worker, void *param)
{
WorkParam *p = MEM_callocN(sizeof(WorkParam), "workparam");
int index;
if (BLI_available_threads(&worker->threadbase) == 0)
{
index = worker->total;
while(index == worker->total)
{
PIL_sleep_ms(worker->sleep_time);
for (index = 0; index < worker->total; index++)
{
if (worker->busy[index] == 0)
{
BLI_remove_thread_index(&worker->threadbase, index);
break;
}
}
}
}
else
{
index = BLI_available_thread_index(&worker->threadbase);
}
worker->busy[index] = 1;
p->param = param;
p->index = index;
p->worker = worker;
BLI_insert_thread(&worker->threadbase, p);
}
/* eof */