aboutsummaryrefslogtreecommitdiff
path: root/build2/scheduler.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'build2/scheduler.cxx')
-rw-r--r--build2/scheduler.cxx202
1 files changed, 120 insertions, 82 deletions
diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx
index 1793ab2..05816b8 100644
--- a/build2/scheduler.cxx
+++ b/build2/scheduler.cxx
@@ -10,69 +10,118 @@ using namespace std;
namespace build2
{
- void scheduler::
- wait (size_t start_count, const atomic_count& task_count, bool work_queue)
+ size_t scheduler::
+ wait (size_t start_count, const atomic_count& task_count, work_queue wq)
{
- if (task_count <= start_count)
- return;
+ // Note that task_count is a synchronization point.
+ //
+ size_t tc;
+
+ if ((tc = task_count.load (memory_order_acquire)) <= start_count)
+ return tc;
assert (max_active_ != 1); // Serial execution, nobody to wait for.
// See if we can run some of our own tasks.
//
- if (work_queue)
+ if (wq != work_none)
{
// If we are waiting on someone else's task count then there migh still
- // be no queue which is set by async().
+ // be no queue (set by async()).
//
if (task_queue* tq = task_queue_)
{
for (lock ql (tq->mutex); !tq->shutdown && !empty_back (*tq); )
+ {
pop_back (*tq, ql);
+ if (wq == work_one)
+ {
+ if ((tc = task_count.load (memory_order_acquire)) <= start_count)
+ return tc;
+ }
+ }
+
// Note that empty task queue doesn't automatically mean the task
// count has been decremented (some might still be executing
// asynchronously).
//
- if (task_count <= start_count)
- return;
+ if ((tc = task_count.load (memory_order_acquire)) <= start_count)
+ return tc;
}
}
- suspend (start_count, task_count);
+ return suspend (start_count, task_count);
}
void scheduler::
- suspend (size_t start_count, const atomic_count& tc)
+ deactivate ()
{
- wait_slot& s (
- wait_queue_[hash<const atomic_count*> () (&tc) % wait_queue_size_]);
+ if (max_active_ == 1) // Serial execution.
+ return;
- // This thread is no longer active.
+ lock l (mutex_);
+
+ active_--;
+ waiting_++;
+
+ if (waiting_ > stat_max_waiters_)
+ stat_max_waiters_ = waiting_;
+
+ // A spare active thread has become available. If there are ready
+ // masters or eager helpers, wake someone up.
//
- {
- lock l (mutex_);
+ if (ready_ != 0)
+ ready_condv_.notify_one ();
+ else if (queued_task_count_.load (std::memory_order_consume) != 0)
+ activate_helper (l);
+ }
- active_--;
- waiting_++;
+ void scheduler::
+ activate (bool collision)
+ {
+ if (max_active_ == 1) // Serial execution.
+ return;
- if (waiting_ > stat_max_waiters_)
- stat_max_waiters_ = waiting_;
+ lock l (mutex_);
+ waiting_--;
- // A spare active thread has become available. If there are ready
- // masters or eager helpers, wake someone up.
- //
- if (ready_ != 0)
- ready_condv_.notify_one ();
- else if (queued_task_count_ != 0)
- activate_helper (l);
- }
+ if (collision)
+ stat_wait_collisions_++;
+
+ // If we have spare active threads, then become active. Otherwise it
+ // enters the ready queue.
+ //
+ ready_++;
+
+ while (!shutdown_ && active_ >= max_active_)
+ ready_condv_.wait (l);
+
+ ready_--;
+
+ if (shutdown_)
+ throw system_error (ECANCELED, system_category ());
+
+ active_++;
+ }
+
+ size_t scheduler::
+ suspend (size_t start_count, const atomic_count& task_count)
+ {
+ wait_slot& s (
+ wait_queue_[
+ hash<const atomic_count*> () (&task_count) % wait_queue_size_]);
+
+ // This thread is no longer active.
+ //
+ deactivate ();
// Note that the task count is checked while holding the lock. We also
// have to notify while holding the lock (see resume()). The aim here
// is not to end up with a notification that happens between the check
// and the wait.
//
+ size_t tc (0);
bool collision;
{
lock l (s.mutex);
@@ -80,20 +129,21 @@ namespace build2
// We have a collision if there is already a waiter for a different
// task count.
//
- collision = (s.waiters++ != 0 && s.tcount != &tc);
+ collision = (s.waiters++ != 0 && s.task_count != &task_count);
// This is nuanced: we want to always have the task count of the last
// thread to join the queue. Otherwise, if threads are leaving and
// joining the queue simultaneously, we may end up with a task count of
// a thread group that is no longer waiting.
//
- s.tcount = &tc;
+ s.task_count = &task_count;
// We could probably relax the atomic access since we use a mutex for
// synchronization though this has a different tradeoff (calling wait
// because we don't see the count).
//
- while (!(s.shutdown || tc <= start_count))
+ while (!(s.shutdown ||
+ (tc = task_count.load (memory_order_acquire)) <= start_count))
s.condv.wait (l);
s.waiters--;
@@ -101,28 +151,9 @@ namespace build2
// This thread is no longer waiting.
//
- {
- lock l (mutex_);
- waiting_--;
-
- if (collision)
- stat_wait_collisions_++;
-
- // If we have spare active threads, then become active. Otherwise it
- // enters the ready queue.
- //
- ready_++;
-
- while (!shutdown_ && active_ >= max_active_)
- ready_condv_.wait (l);
-
- ready_--;
-
- if (shutdown_)
- throw system_error (ECANCELED, system_category ());
+ activate (collision);
- active_++;
- }
+ return tc;
}
void scheduler::
@@ -151,7 +182,7 @@ namespace build2
size_t scheduler::
shard_size (size_t mul, size_t div) const
{
- size_t n (max_threads_ == 1 ? 0 : max_threads_ * mul / div / 2);
+ size_t n (max_threads_ == 1 ? 0 : max_threads_ * mul / div / 4);
// Experience shows that we want something close to 2x for small numbers,
// then reduce to 1.5x in-between, and 1x for large ones.
@@ -208,11 +239,11 @@ namespace build2
//
lock l (mutex_);
- // Use 8x max_active on 32-bit and 16x max_active on 64-bit. Unless we
+ // Use 16x max_active on 32-bit and 32x max_active on 64-bit. Unless we
// were asked to run serially.
//
if (max_threads == 0)
- max_threads = max_active * (max_active == 1 ? 1 : sizeof (void*) * 2);
+ max_threads = max_active * (max_active == 1 ? 1 : sizeof (void*) * 4);
assert (shutdown_ &&
init_active != 0 &&
@@ -229,10 +260,7 @@ namespace build2
//
task_queue_depth_ = queue_depth != 0
? queue_depth
- : max_active * sizeof (void*) * 2;
-
- if (max_active != 1)
- task_queues_.reserve (max_threads_);
+ : max_active * sizeof (void*) * 4;
queued_task_count_.store (0, memory_order_relaxed);
@@ -289,7 +317,11 @@ namespace build2
if (!shutdown_)
{
- // Signal shutdown and collect statistics.
+ // Collect statistics.
+ //
+ r.thread_helpers = helpers_;
+
+ // Signal shutdown.
//
shutdown_ = true;
@@ -300,18 +332,16 @@ namespace build2
ws.shutdown = true;
}
- for (unique_ptr<task_queue>& tq: task_queues_)
+ for (task_queue& tq: task_queues_)
{
- lock ql (tq->mutex);
- r.task_queue_full += tq->stat_full;
- tq->shutdown = true;
+ lock ql (tq.mutex);
+ r.task_queue_full += tq.stat_full;
+ tq.shutdown = true;
}
// Wait for all the helpers to terminate waking up any thread that
// sleeps.
//
- r.thread_helpers = helpers_;
-
while (helpers_ != 0)
{
bool i (idle_ != 0);
@@ -344,6 +374,7 @@ namespace build2
r.thread_max_waiting = stat_max_waiters_;
r.task_queue_depth = task_queue_depth_;
+ r.task_queue_remain = queued_task_count_.load (memory_order_consume);
r.wait_queue_slots = wait_queue_size_;
r.wait_queue_collisions = stat_wait_collisions_;
@@ -359,8 +390,19 @@ namespace build2
{
if (idle_ != 0)
idle_condv_.notify_one ();
- else if (init_active_ + helpers_ < max_threads_)
+ else if (init_active_ + helpers_ < max_threads_ ||
+ //
+ // Ignore the max_threads value if we have queued tasks but no
+ // active threads. This means everyone is waiting for something
+ // to happen but nobody is doing anything (e.g., work the
+ // queues). This, for example, can happen if a thread waits for
+ // a task that is in its queue but is below the mark.
+ //
+ (active_ == 0 &&
+ queued_task_count_.load (memory_order_consume) != 0))
+ {
create_helper (l);
+ }
}
}
@@ -412,19 +454,18 @@ namespace build2
{
s.active_++;
- while (s.queued_task_count_ != 0)
+ while (s.queued_task_count_.load (memory_order_consume) != 0)
{
- // Queues are never removed and there shouldn't be any reallocations
- // since we reserve maximum possible size upfront. Which means we
- // can get the current number of queues and release the main lock
- // while examining each of them.
+ // Queues are never removed which means we can get the current range
+ // and release the main lock while examining each of them.
//
- size_t n (s.task_queues_.size ());
+ auto it (s.task_queues_.begin ());
+ size_t n (s.task_queues_.size ()); // Different to end().
l.unlock ();
for (size_t i (0); i != n; ++i)
{
- task_queue& tq (*s.task_queues_[i]);
+ task_queue& tq (*it++);
for (lock ql (tq.mutex); !tq.shutdown && !s.empty_front (tq); )
s.pop_front (tq, ql);
@@ -465,18 +506,15 @@ namespace build2
// Note that task_queue_depth is immutable between startup() and
// shutdown() (but see join()).
//
- unique_ptr<task_queue> tqp (new task_queue (task_queue_depth_));
- task_queue& tq (*tqp);
-
+ task_queue* tq;
{
lock l (mutex_);
- tq.shutdown = shutdown_;
- task_queues_.push_back (move (tqp));
+ task_queues_.emplace_back (task_queue_depth_);
+ tq = &task_queues_.back ();
+ tq->shutdown = shutdown_;
}
- task_queue_ = &tq;
- return tq;
+ task_queue_ = tq;
+ return *tq;
}
-
- scheduler sched;
}