diff --git a/src/gallium/auxiliary/util/u_queue.c b/src/gallium/auxiliary/util/u_queue.c index de0422a0c13..775cb73de43 100644 --- a/src/gallium/auxiliary/util/u_queue.c +++ b/src/gallium/auxiliary/util/u_queue.c @@ -71,17 +71,25 @@ static PIPE_THREAD_ROUTINE(util_queue_thread_func, input) while (1) { struct util_queue_job job; - pipe_semaphore_wait(&queue->queued); - if (queue->kill_threads) - break; - pipe_mutex_lock(queue->lock); + assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); + + /* wait if the queue is empty */ + while (!queue->kill_threads && queue->num_queued == 0) + pipe_condvar_wait(queue->has_queued_cond, queue->lock); + + if (queue->kill_threads) { + pipe_mutex_unlock(queue->lock); + break; + } + job = queue->jobs[queue->read_idx]; queue->jobs[queue->read_idx].job = NULL; queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; - pipe_mutex_unlock(queue->lock); - pipe_semaphore_signal(&queue->has_space); + queue->num_queued--; + pipe_condvar_signal(queue->has_space_cond); + pipe_mutex_unlock(queue->lock); if (job.job) { queue->execute_job(job.job, thread_index); @@ -122,8 +130,10 @@ util_queue_init(struct util_queue *queue, queue->execute_job = execute_job; pipe_mutex_init(queue->lock); - pipe_semaphore_init(&queue->has_space, max_jobs); - pipe_semaphore_init(&queue->queued, 0); + + queue->num_queued = 0; + pipe_condvar_init(queue->has_queued_cond); + pipe_condvar_init(queue->has_space_cond); queue->threads = (pipe_thread*)CALLOC(num_threads, sizeof(pipe_thread)); if (!queue->threads) @@ -156,8 +166,8 @@ fail: FREE(queue->threads); if (queue->jobs) { - pipe_semaphore_destroy(&queue->has_space); - pipe_semaphore_destroy(&queue->queued); + pipe_condvar_destroy(queue->has_space_cond); + pipe_condvar_destroy(queue->has_queued_cond); pipe_mutex_destroy(queue->lock); FREE(queue->jobs); } @@ -172,17 +182,16 @@ util_queue_destroy(struct util_queue *queue) unsigned i; /* Signal all threads to terminate. */ - pipe_mutex_lock(queue->queued.mutex); + pipe_mutex_lock(queue->lock); queue->kill_threads = 1; - queue->queued.counter = queue->num_threads; - pipe_condvar_broadcast(queue->queued.cond); - pipe_mutex_unlock(queue->queued.mutex); + pipe_condvar_broadcast(queue->has_queued_cond); + pipe_mutex_unlock(queue->lock); for (i = 0; i < queue->num_threads; i++) pipe_thread_wait(queue->threads[i]); - pipe_semaphore_destroy(&queue->has_space); - pipe_semaphore_destroy(&queue->queued); + pipe_condvar_destroy(queue->has_space_cond); + pipe_condvar_destroy(queue->has_queued_cond); pipe_mutex_destroy(queue->lock); FREE(queue->jobs); FREE(queue->threads); @@ -214,15 +223,20 @@ util_queue_add_job(struct util_queue *queue, assert(fence->signalled); fence->signalled = false; - /* if the queue is full, wait until there is space */ - pipe_semaphore_wait(&queue->has_space); - pipe_mutex_lock(queue->lock); + assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); + + /* if the queue is full, wait until there is space */ + while (queue->num_queued == queue->max_jobs) + pipe_condvar_wait(queue->has_space_cond, queue->lock); + ptr = &queue->jobs[queue->write_idx]; assert(ptr->job == NULL); ptr->job = job; ptr->fence = fence; queue->write_idx = (queue->write_idx + 1) % queue->max_jobs; + + queue->num_queued++; + pipe_condvar_signal(queue->has_queued_cond); pipe_mutex_unlock(queue->lock); - pipe_semaphore_signal(&queue->queued); } diff --git a/src/gallium/auxiliary/util/u_queue.h b/src/gallium/auxiliary/util/u_queue.h index f005ad5ef4c..750327e0279 100644 --- a/src/gallium/auxiliary/util/u_queue.h +++ b/src/gallium/auxiliary/util/u_queue.h @@ -53,9 +53,10 @@ struct util_queue_job { struct util_queue { const char *name; pipe_mutex lock; - pipe_semaphore has_space; - pipe_semaphore queued; + pipe_condvar has_queued_cond; + pipe_condvar has_space_cond; pipe_thread *threads; + int num_queued; unsigned num_threads; int kill_threads; int max_jobs;