From ea22643b2217921df74ea14df47d7c83987d5761 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Fri, 9 Dec 2016 17:29:27 +0200 Subject: Initial parallel scheduler implementation, use to run testscrips --- build2/scheduler.cxx | 383 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 383 insertions(+) create mode 100644 build2/scheduler.cxx (limited to 'build2/scheduler.cxx') diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx new file mode 100644 index 0000000..b62d2d4 --- /dev/null +++ b/build2/scheduler.cxx @@ -0,0 +1,383 @@ +// file : build2/scheduler.cxx -*- C++ -*- +// copyright : Copyright (c) 2014-2016 Code Synthesis Ltd +// license : MIT; see accompanying LICENSE file + +#include + +using namespace std; + +namespace build2 +{ + void scheduler:: + wait (atomic_count& task_count) + { + if (task_count == 0) + return; + + // See if we can run some of our own tasks. + // + task_queue& tq (*task_queue_); // Must have been initializied by async(). + + for (lock ql (tq.mutex); !tq.shutdown && !empty_back (tq); ) + { + // Save the old stop point and set the new one in case the task we are + // about to run adds sub-tasks. + // + size_t stop (tq.stop); + tq.stop = tq.tail - 1; // Index of the first sub-task to be added (-1 + // is for the following pop_back()). + + pop_back (tq, ql); // Releases the lock. + ql.lock (); + + // Restore the old stop point which we might have to adjust. + // + tq.stop = tq.head > stop ? tq.head : tq.tail < stop ? tq.tail : stop; + } + + // Note that empty task queue doesn't automatically mean the task count is + // zero (some might still be executing asynchronously). + // + if (task_count == 0) + return; + + suspend (task_count); + } + + void scheduler:: + suspend (atomic_count& tc) + { + wait_slot& s ( + wait_queue_[std::hash () (&tc) % wait_queue_size_]); + + // This thread is no longer active. + // + { + lock l (mutex_); + active_--; + waiting_++; + + if (waiting_ > stat_max_waiters_) + stat_max_waiters_ = waiting_; + + // A spare active thread has become available. If there are ready + // masters or eager helpers, wake someone up. + // + if (ready_ != 0) + ready_condv_.notify_one (); + else if (task_) + activate_helper (l); + } + + // Note that the task count is checked while holding the lock. We also + // have to notify while holding the lock (see resume()). The aim here + // is not to end up with a notification that happens between the check + // and the wait. + // + bool collision; + { + lock l (s.mutex); + collision = (s.waiters++ != 0); + + // Since we use a mutex for synchronization, we can relax the atomic + // access. + // + while (!s.shutdown && tc.load (std::memory_order_relaxed) != 0) + s.condv.wait (l); + + s.waiters--; + } + + // This thread is no longer waiting. + // + { + lock l (mutex_); + waiting_--; + + if (collision) + stat_wait_collisions_++; + + // If we have spare active threads, then become active. Otherwise it + // enters the ready queue. + // + ready_++; + + while (!shutdown_ && active_ >= max_active_) + ready_condv_.wait (l); + + ready_--; + + if (shutdown_) + throw system_error (ECANCELED, system_category ()); + + active_++; + } + } + + void scheduler:: + resume (atomic_count& tc) + { + wait_slot& s ( + wait_queue_[std::hash () (&tc) % wait_queue_size_]); + + // See suspend() for why we must hold the lock. + // + lock l (s.mutex); + + if (s.waiters != 0) + s.condv.notify_all (); + } + + scheduler:: + ~scheduler () + { + try { shutdown (); } catch (system_error&) {} + } + + void scheduler:: + startup (size_t max_active, size_t init_active, size_t max_threads) + { + // Use 4x max_active on 32-bit and 8x max_active on 64-bit. Unless we were + // asked to run serially. + // + if (max_threads == 0) + max_threads = max_active * (max_active == 1 ? 1 : sizeof (void*)); + + assert (shutdown_ && + init_active != 0 && + init_active <= max_active && + max_active <= max_threads); + + active_ = init_active_ = init_active; + max_active_ = max_active; + max_threads_ = max_threads; + + // This value should be proportional to the amount of hardware concurrency + // we have. Note that the queue entry is quite sizable. + // + task_queue_depth_ = max_active * sizeof (void*) * 4; + + task_queues_.clear (); + task_queues_.reserve (max_threads_); + + // Pick a nice prime for common max_threads numbers. Though Intel Xeons + // are all over the map when it comes to cores (6, 8, 10, 12, 14, 16, + // 18, 20, 22). + // + wait_queue_size_ = // HW threads x bits + max_threads == 8 ? 17 : // 2 x 4 + max_threads == 16 ? 31 : // 4 x 4, 2 x 8 + max_threads == 32 ? 63 : // 4 x 8 + max_threads == 48 ? 97 : // 6 x 8 + max_threads == 64 ? 127 : // 8 x 8 + max_threads == 96 ? 191 : // 12 x 8 + max_threads == 128 ? 257 : // 16 x 8 + max_threads == 192 ? 383 : // 24 x 8 + max_threads == 256 ? 509 : // 32 x 8 + max_threads == 384 ? 769 : // 48 x 8 + max_threads == 512 ? 1021 : // 64 x 8 + 2 * max_threads - 1; + + wait_queue_.reset (new wait_slot[wait_queue_size_]); + + // Reset stats counters. + // + stat_max_waiters_ = 0; + stat_wait_collisions_ = 0; + + task_ = false; + shutdown_ = false; + + for (size_t i (0); i != wait_queue_size_; ++i) + wait_queue_[i].shutdown = false; + } + + auto scheduler:: + shutdown () -> stat + { + // Our overall approach to shutdown is not to try and stop everything as + // quickly as possible but rather to avoid performing any tasks. This + // avoids having code littered with if(shutdown) on every second line. + + stat r; + lock l (mutex_); + + if (!shutdown_) + { + // Signal shutdown and collect statistics. + // + shutdown_ = true; + + for (size_t i (0); i != wait_queue_size_; ++i) + { + wait_slot& ws (wait_queue_[i]); + lock l (ws.mutex); + ws.shutdown = true; + } + + for (unique_ptr& tq: task_queues_) + { + lock l (tq->mutex); + r.task_queue_full += tq->stat_full; + tq->shutdown = true; + } + + // Wait for all the helpers to terminate waking up any thread that + // sleeps. + // + r.thread_helpers = helpers_; + + while (helpers_ != 0) + { + bool i (idle_ != 0); + bool r (ready_ != 0); + bool w (waiting_ != 0); + + l.unlock (); + + if (i) + idle_condv_.notify_all (); + + if (r) + ready_condv_.notify_all (); + + if (w) + for (size_t i (0); i != wait_queue_size_; ++i) + wait_queue_[i].condv.notify_all (); + + this_thread::yield (); + l.lock (); + } + + r.thread_max_active = max_active_; + r.thread_max_total = max_threads_; + r.thread_max_waiting = stat_max_waiters_; + + r.task_queue_depth = task_queue_depth_; + + r.wait_queue_slots = wait_queue_size_; + r.wait_queue_collisions = stat_wait_collisions_; + } + + return r; + } + + void scheduler:: + activate_helper (lock& l) + { + if (!shutdown_) + { + if (idle_ != 0) + idle_condv_.notify_one (); + else if (init_active_ + helpers_ < max_threads_) + create_helper (l); + } + } + + void scheduler:: + create_helper (lock& l) + { + helpers_++; + starting_++; + l.unlock (); + + // Restore the counter if the thread creation fails. + // + struct guard + { + lock* l; + size_t& h; + size_t& s; + + ~guard () {if (l != nullptr) {l->lock (); h--; s--;}} + + } g {&l, helpers_, starting_}; + + thread t (helper, this); + g.l = nullptr; // Disarm. + + t.detach (); + } + + void scheduler:: + helper (void* d) + { + scheduler& s (*static_cast (d)); + + // Note that this thread can be in an in-between state (not active or + // idle) but only while holding the lock. Which means that if we have the + // lock then we can account for all of them (this is important during + // shutdown). Except when the thread is just starting, before acquiring + // the lock for the first time, which we handle with the starting count. + // + lock l (s.mutex_); + s.starting_--; + + while (!s.shutdown_) + { + // If there is a spare active thread, become active and go looking for + // some work. + // + if (s.active_ < s.max_active_) + { + s.active_++; + + while (s.task_) // There might be a task. + { + s.task_ = false; // We will process all that are currently there. + + // Queues are never removed and there shouldn't be any reallocations + // since we reserve maximum possible size upfront. Which means we + // can get the current number of queues and release the main lock + // while examining each of them. + // + size_t n (s.task_queues_.size ()); + l.unlock (); + + for (size_t i (0); i != n; ++i) + { + task_queue& tq (*s.task_queues_[i]); + + for (lock ql (tq.mutex); + !tq.shutdown && !s.empty_front (tq); + ql.lock ()) + s.pop_front (tq, ql); // Releases the lock. + } + + l.lock (); + // If task_ became true, then there might be new tasks. + } + + s.active_--; + + // While executing the tasks a thread might have become ready. + // + if (s.ready_ != 0) + s.ready_condv_.notify_one (); + } + + // Become idle and wait for a notification (note that task_ is false + // here). + // + s.idle_++; + s.idle_condv_.wait (l); + s.idle_--; + } + + s.helpers_--; + } + + thread_local scheduler::task_queue* scheduler::task_queue_ = nullptr; + + auto scheduler:: + create_queue () -> task_queue& + { + lock l (mutex_); + task_queues_.push_back (make_unique (task_queue_depth_)); + task_queue_ = task_queues_.back ().get (); + task_queue_->shutdown = shutdown_; + return *task_queue_; + } + + scheduler sched; +} -- cgit v1.1