From b37f1aa6398065be806e6605a023189685669885 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Wed, 15 Feb 2017 03:55:15 +0200 Subject: Implement parallel match --- build2/scheduler.cxx | 202 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 120 insertions(+), 82 deletions(-) (limited to 'build2/scheduler.cxx') diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx index 1793ab2..05816b8 100644 --- a/build2/scheduler.cxx +++ b/build2/scheduler.cxx @@ -10,69 +10,118 @@ using namespace std; namespace build2 { - void scheduler:: - wait (size_t start_count, const atomic_count& task_count, bool work_queue) + size_t scheduler:: + wait (size_t start_count, const atomic_count& task_count, work_queue wq) { - if (task_count <= start_count) - return; + // Note that task_count is a synchronization point. + // + size_t tc; + + if ((tc = task_count.load (memory_order_acquire)) <= start_count) + return tc; assert (max_active_ != 1); // Serial execution, nobody to wait for. // See if we can run some of our own tasks. // - if (work_queue) + if (wq != work_none) { // If we are waiting on someone else's task count then there migh still - // be no queue which is set by async(). + // be no queue (set by async()). // if (task_queue* tq = task_queue_) { for (lock ql (tq->mutex); !tq->shutdown && !empty_back (*tq); ) + { pop_back (*tq, ql); + if (wq == work_one) + { + if ((tc = task_count.load (memory_order_acquire)) <= start_count) + return tc; + } + } + // Note that empty task queue doesn't automatically mean the task // count has been decremented (some might still be executing // asynchronously). // - if (task_count <= start_count) - return; + if ((tc = task_count.load (memory_order_acquire)) <= start_count) + return tc; } } - suspend (start_count, task_count); + return suspend (start_count, task_count); } void scheduler:: - suspend (size_t start_count, const atomic_count& tc) + deactivate () { - wait_slot& s ( - wait_queue_[hash () (&tc) % wait_queue_size_]); + if (max_active_ == 1) // Serial execution. + return; - // This thread is no longer active. + lock l (mutex_); + + active_--; + waiting_++; + + if (waiting_ > stat_max_waiters_) + stat_max_waiters_ = waiting_; + + // A spare active thread has become available. If there are ready + // masters or eager helpers, wake someone up. // - { - lock l (mutex_); + if (ready_ != 0) + ready_condv_.notify_one (); + else if (queued_task_count_.load (std::memory_order_consume) != 0) + activate_helper (l); + } - active_--; - waiting_++; + void scheduler:: + activate (bool collision) + { + if (max_active_ == 1) // Serial execution. + return; - if (waiting_ > stat_max_waiters_) - stat_max_waiters_ = waiting_; + lock l (mutex_); + waiting_--; - // A spare active thread has become available. If there are ready - // masters or eager helpers, wake someone up. - // - if (ready_ != 0) - ready_condv_.notify_one (); - else if (queued_task_count_ != 0) - activate_helper (l); - } + if (collision) + stat_wait_collisions_++; + + // If we have spare active threads, then become active. Otherwise it + // enters the ready queue. + // + ready_++; + + while (!shutdown_ && active_ >= max_active_) + ready_condv_.wait (l); + + ready_--; + + if (shutdown_) + throw system_error (ECANCELED, system_category ()); + + active_++; + } + + size_t scheduler:: + suspend (size_t start_count, const atomic_count& task_count) + { + wait_slot& s ( + wait_queue_[ + hash () (&task_count) % wait_queue_size_]); + + // This thread is no longer active. + // + deactivate (); // Note that the task count is checked while holding the lock. We also // have to notify while holding the lock (see resume()). The aim here // is not to end up with a notification that happens between the check // and the wait. // + size_t tc (0); bool collision; { lock l (s.mutex); @@ -80,20 +129,21 @@ namespace build2 // We have a collision if there is already a waiter for a different // task count. // - collision = (s.waiters++ != 0 && s.tcount != &tc); + collision = (s.waiters++ != 0 && s.task_count != &task_count); // This is nuanced: we want to always have the task count of the last // thread to join the queue. Otherwise, if threads are leaving and // joining the queue simultaneously, we may end up with a task count of // a thread group that is no longer waiting. // - s.tcount = &tc; + s.task_count = &task_count; // We could probably relax the atomic access since we use a mutex for // synchronization though this has a different tradeoff (calling wait // because we don't see the count). // - while (!(s.shutdown || tc <= start_count)) + while (!(s.shutdown || + (tc = task_count.load (memory_order_acquire)) <= start_count)) s.condv.wait (l); s.waiters--; @@ -101,28 +151,9 @@ namespace build2 // This thread is no longer waiting. // - { - lock l (mutex_); - waiting_--; - - if (collision) - stat_wait_collisions_++; - - // If we have spare active threads, then become active. Otherwise it - // enters the ready queue. - // - ready_++; - - while (!shutdown_ && active_ >= max_active_) - ready_condv_.wait (l); - - ready_--; - - if (shutdown_) - throw system_error (ECANCELED, system_category ()); + activate (collision); - active_++; - } + return tc; } void scheduler:: @@ -151,7 +182,7 @@ namespace build2 size_t scheduler:: shard_size (size_t mul, size_t div) const { - size_t n (max_threads_ == 1 ? 0 : max_threads_ * mul / div / 2); + size_t n (max_threads_ == 1 ? 0 : max_threads_ * mul / div / 4); // Experience shows that we want something close to 2x for small numbers, // then reduce to 1.5x in-between, and 1x for large ones. @@ -208,11 +239,11 @@ namespace build2 // lock l (mutex_); - // Use 8x max_active on 32-bit and 16x max_active on 64-bit. Unless we + // Use 16x max_active on 32-bit and 32x max_active on 64-bit. Unless we // were asked to run serially. // if (max_threads == 0) - max_threads = max_active * (max_active == 1 ? 1 : sizeof (void*) * 2); + max_threads = max_active * (max_active == 1 ? 1 : sizeof (void*) * 4); assert (shutdown_ && init_active != 0 && @@ -229,10 +260,7 @@ namespace build2 // task_queue_depth_ = queue_depth != 0 ? queue_depth - : max_active * sizeof (void*) * 2; - - if (max_active != 1) - task_queues_.reserve (max_threads_); + : max_active * sizeof (void*) * 4; queued_task_count_.store (0, memory_order_relaxed); @@ -289,7 +317,11 @@ namespace build2 if (!shutdown_) { - // Signal shutdown and collect statistics. + // Collect statistics. + // + r.thread_helpers = helpers_; + + // Signal shutdown. // shutdown_ = true; @@ -300,18 +332,16 @@ namespace build2 ws.shutdown = true; } - for (unique_ptr& tq: task_queues_) + for (task_queue& tq: task_queues_) { - lock ql (tq->mutex); - r.task_queue_full += tq->stat_full; - tq->shutdown = true; + lock ql (tq.mutex); + r.task_queue_full += tq.stat_full; + tq.shutdown = true; } // Wait for all the helpers to terminate waking up any thread that // sleeps. // - r.thread_helpers = helpers_; - while (helpers_ != 0) { bool i (idle_ != 0); @@ -344,6 +374,7 @@ namespace build2 r.thread_max_waiting = stat_max_waiters_; r.task_queue_depth = task_queue_depth_; + r.task_queue_remain = queued_task_count_.load (memory_order_consume); r.wait_queue_slots = wait_queue_size_; r.wait_queue_collisions = stat_wait_collisions_; @@ -359,8 +390,19 @@ namespace build2 { if (idle_ != 0) idle_condv_.notify_one (); - else if (init_active_ + helpers_ < max_threads_) + else if (init_active_ + helpers_ < max_threads_ || + // + // Ignore the max_threads value if we have queued tasks but no + // active threads. This means everyone is waiting for something + // to happen but nobody is doing anything (e.g., work the + // queues). This, for example, can happen if a thread waits for + // a task that is in its queue but is below the mark. + // + (active_ == 0 && + queued_task_count_.load (memory_order_consume) != 0)) + { create_helper (l); + } } } @@ -412,19 +454,18 @@ namespace build2 { s.active_++; - while (s.queued_task_count_ != 0) + while (s.queued_task_count_.load (memory_order_consume) != 0) { - // Queues are never removed and there shouldn't be any reallocations - // since we reserve maximum possible size upfront. Which means we - // can get the current number of queues and release the main lock - // while examining each of them. + // Queues are never removed which means we can get the current range + // and release the main lock while examining each of them. // - size_t n (s.task_queues_.size ()); + auto it (s.task_queues_.begin ()); + size_t n (s.task_queues_.size ()); // Different to end(). l.unlock (); for (size_t i (0); i != n; ++i) { - task_queue& tq (*s.task_queues_[i]); + task_queue& tq (*it++); for (lock ql (tq.mutex); !tq.shutdown && !s.empty_front (tq); ) s.pop_front (tq, ql); @@ -465,18 +506,15 @@ namespace build2 // Note that task_queue_depth is immutable between startup() and // shutdown() (but see join()). // - unique_ptr tqp (new task_queue (task_queue_depth_)); - task_queue& tq (*tqp); - + task_queue* tq; { lock l (mutex_); - tq.shutdown = shutdown_; - task_queues_.push_back (move (tqp)); + task_queues_.emplace_back (task_queue_depth_); + tq = &task_queues_.back (); + tq->shutdown = shutdown_; } - task_queue_ = &tq; - return tq; + task_queue_ = tq; + return *tq; } - - scheduler sched; } -- cgit v1.1