From 977d07a3ae47ef204665d1eda2d642e5064724f3 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Mon, 24 Jun 2019 12:01:19 +0200 Subject: Split build system into library and driver --- build2/scheduler.cxx | 802 --------------------------------------------------- 1 file changed, 802 deletions(-) delete mode 100644 build2/scheduler.cxx (limited to 'build2/scheduler.cxx') diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx deleted file mode 100644 index ad3a640..0000000 --- a/build2/scheduler.cxx +++ /dev/null @@ -1,802 +0,0 @@ -// file : build2/scheduler.cxx -*- C++ -*- -// copyright : Copyright (c) 2014-2019 Code Synthesis Ltd -// license : MIT; see accompanying LICENSE file - -#include - -#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) -# include -# ifdef __FreeBSD__ -# include // pthread_attr_get_np() -# endif -#endif - -#ifndef _WIN32 -# include // this_thread::sleep_for() -#else -# include - -# include -#endif - -#include -#include // std::terminate() - -#include - -using namespace std; - -namespace build2 -{ - size_t scheduler:: - wait (size_t start_count, const atomic_count& task_count, work_queue wq) - { - // Note that task_count is a synchronization point. - // - size_t tc; - - if ((tc = task_count.load (memory_order_acquire)) <= start_count) - return tc; - - assert (max_active_ != 1); // Serial execution, nobody to wait for. - - // See if we can run some of our own tasks. - // - if (wq != work_none) - { - // If we are waiting on someone else's task count then there migh still - // be no queue (set by async()). - // - if (task_queue* tq = task_queue_) - { - for (lock ql (tq->mutex); !tq->shutdown && !empty_back (*tq); ) - { - pop_back (*tq, ql); - - if (wq == work_one) - { - if ((tc = task_count.load (memory_order_acquire)) <= start_count) - return tc; - } - } - - // Note that empty task queue doesn't automatically mean the task - // count has been decremented (some might still be executing - // asynchronously). - // - if ((tc = task_count.load (memory_order_acquire)) <= start_count) - return tc; - } - } - - return suspend (start_count, task_count); - } - - void scheduler:: - deactivate () - { - if (max_active_ == 1) // Serial execution. - return; - - lock l (mutex_); - - active_--; - waiting_++; - progress_++; - - if (waiting_ > stat_max_waiters_) - stat_max_waiters_ = waiting_; - - // A spare active thread has become available. If there are ready masters - // or eager helpers, wake someone up. - // - if (ready_ != 0) - { - ready_condv_.notify_one (); - } - else if (queued_task_count_.load (std::memory_order_consume) != 0) - { - activate_helper (l); - } - // @@ TODO: Redo as a separate "monitoring" thread. - // - // This still doesn't work for the phase lock case where we call - // deactivate and then go wait on a condition variable: we are doing - // deadlock detection while holding the lock that prevents other - // threads from making progress! - // -#if 0 - else if (active_ == 0) - { - // We may have a deadlock which can happen because of dependency cycles. - // - // Relying on the active_ count alone is not precise enough, however: - // some threads might be transitioning between the active/waiting/ready - // states. Carefully accounting for this is not trivial, to say the - // least (especially in the face of spurious wakeups). So we are going - // to do a "fuzzy" deadlock detection by measuring "progress". The idea - // is that those transitions should be pretty short-lived and so if we - // wait for a couple of hundreds context switches, then we should be - // able to distinguish a real deadlock from the transition case. - // - size_t p (progress_); - - for (size_t i (0); i != 100; ++i) - { - l.unlock (); - this_thread::yield () is not enough. - l.lock (); - - if (p != progress_) - break; - } - - if (p == progress_) - { - // Reactivate and fail. - // - waiting_--; - active_++; - - // Shutting things down cleanly is tricky: we could have handled it in - // the scheduler (e.g., by setting a flag and then waking everyone up, - // similar to shutdown). But there could also be "external waiters" - // that have called deactivate() -- we have no way to wake those up. - // So for now we are going to abort (the nice thing about abort is if - // this is not a dependency cycle, then we have a core to examine). - // - error << "deadlock detected, can be caused by a dependency cycle" << - info << "re-run with -s to diagnose dependency cycles"; - - std::terminate (); - } - } -#endif - } - - void scheduler:: - activate (bool collision) - { - if (max_active_ == 1) // Serial execution. - return; - - lock l (mutex_); - - if (collision) - stat_wait_collisions_++; - - // If we have spare active threads, then become active. Otherwise it - // enters the ready queue. - // - waiting_--; - ready_++; - progress_++; - - while (!shutdown_ && active_ >= max_active_) - ready_condv_.wait (l); - - ready_--; - active_++; - progress_++; - - if (shutdown_) - throw_generic_error (ECANCELED); - } - - void scheduler:: - sleep (const duration& d) - { - deactivate (); - - // MINGW GCC 4.9 doesn't implement this_thread so use Win32 Sleep(). - // -#ifndef _WIN32 - this_thread::sleep_for (d); -#else - using namespace chrono; - - Sleep (static_cast (duration_cast (d).count ())); -#endif - - activate (); - } - - size_t scheduler:: - suspend (size_t start_count, const atomic_count& task_count) - { - wait_slot& s ( - wait_queue_[ - hash () (&task_count) % wait_queue_size_]); - - // This thread is no longer active. - // - deactivate (); - - // Note that the task count is checked while holding the lock. We also - // have to notify while holding the lock (see resume()). The aim here - // is not to end up with a notification that happens between the check - // and the wait. - // - size_t tc (0); - bool collision; - { - lock l (s.mutex); - - // We have a collision if there is already a waiter for a different - // task count. - // - collision = (s.waiters++ != 0 && s.task_count != &task_count); - - // This is nuanced: we want to always have the task count of the last - // thread to join the queue. Otherwise, if threads are leaving and - // joining the queue simultaneously, we may end up with a task count of - // a thread group that is no longer waiting. - // - s.task_count = &task_count; - - // We could probably relax the atomic access since we use a mutex for - // synchronization though this has a different tradeoff (calling wait - // because we don't see the count). - // - while (!(s.shutdown || - (tc = task_count.load (memory_order_acquire)) <= start_count)) - s.condv.wait (l); - - s.waiters--; - } - - // This thread is no longer waiting. - // - activate (collision); - - return tc; - } - - void scheduler:: - resume (const atomic_count& tc) - { - if (max_active_ == 1) // Serial execution, nobody to wakeup. - return; - - wait_slot& s ( - wait_queue_[hash () (&tc) % wait_queue_size_]); - - // See suspend() for why we must hold the lock. - // - lock l (s.mutex); - - if (s.waiters != 0) - s.condv.notify_all (); - } - - scheduler:: - ~scheduler () - { - try { shutdown (); } catch (system_error&) {} - } - - auto scheduler:: - wait_idle () -> lock - { - lock l (mutex_); - - assert (waiting_ == 0); - assert (ready_ == 0); - - while (active_ != init_active_ || starting_ != 0) - { - l.unlock (); - this_thread::yield (); - l.lock (); - } - - return l; - } - - size_t scheduler:: - shard_size (size_t mul, size_t div) const - { - size_t n (max_threads_ == 1 ? 0 : max_threads_ * mul / div / 4); - - // Experience shows that we want something close to 2x for small numbers, - // then reduce to 1.5x in-between, and 1x for large ones. - // - // Note that Intel Xeons are all over the map when it comes to cores (6, - // 8, 10, 12, 14, 16, 18, 20, 22). - // - return // HW threads x arch-bits (see max_threads below) - n == 0 ? 1 : // serial - // - // 2x - // - n == 1 ? 3 : - n == 2 ? 5 : - n == 4 ? 11 : - n == 6 ? 13 : - n == 8 ? 17 : // 2 x 4 - n == 16 ? 31 : // 4 x 4, 2 x 8 - // - // 1.5x - // - n == 32 ? 47 : // 4 x 8 - n == 48 ? 53 : // 6 x 8 - n == 64 ? 67 : // 8 x 8 - n == 80 ? 89 : // 10 x 8 - // - // 1x - // - n == 96 ? 101 : // 12 x 8 - n == 112 ? 127 : // 14 x 8 - n == 128 ? 131 : // 16 x 8 - n == 144 ? 139 : // 18 x 8 - n == 160 ? 157 : // 20 x 8 - n == 176 ? 173 : // 22 x 8 - n == 192 ? 191 : // 24 x 8 - n == 224 ? 223 : // 28 x 8 - n == 256 ? 251 : // 32 x 8 - n == 288 ? 271 : // 36 x 8 - n == 320 ? 313 : // 40 x 8 - n == 352 ? 331 : // 44 x 8 - n == 384 ? 367 : // 48 x 8 - n == 512 ? 499 : // 64 x 8 - n - 1; // Assume it is even. - } - - void scheduler:: - startup (size_t max_active, - size_t init_active, - size_t max_threads, - size_t queue_depth, - optional max_stack) - { - // Lock the mutex to make sure our changes are visible in (other) active - // threads. - // - lock l (mutex_); - - max_stack_ = max_stack; - - // Use 8x max_active on 32-bit and 32x max_active on 64-bit. Unless we - // were asked to run serially. - // - if (max_threads == 0) - max_threads = (max_active == 1 ? 1 : - sizeof (void*) < 8 ? 8 : 32) * max_active; - - assert (shutdown_ && - init_active != 0 && - init_active <= max_active && - max_active <= max_threads); - - active_ = init_active_ = init_active; - max_active_ = orig_max_active_ = max_active; - max_threads_ = max_threads; - - // This value should be proportional to the amount of hardware concurrency - // we have (no use queing things up if helpers cannot keep up). Note that - // the queue entry is quite sizable. - // - // The relationship is as follows: we want to have a deeper queue if the - // tasks take long (e.g., compilation) and shorter if they are quick (e.g, - // test execution). If the tasks are quick then the synchronization - // overhead required for queuing/dequeuing things starts to dominate. - // - task_queue_depth_ = queue_depth != 0 - ? queue_depth - : max_active * 4; - - queued_task_count_.store (0, memory_order_relaxed); - - if ((wait_queue_size_ = max_threads == 1 ? 0 : shard_size ()) != 0) - wait_queue_.reset (new wait_slot[wait_queue_size_]); - - // Reset counters. - // - stat_max_waiters_ = 0; - stat_wait_collisions_ = 0; - - progress_ = 0; - - for (size_t i (0); i != wait_queue_size_; ++i) - wait_queue_[i].shutdown = false; - - shutdown_ = false; - } - - void scheduler:: - tune (size_t max_active) - { - if (max_active == 0) - max_active = orig_max_active_; - - assert (max_active >= init_active_ && - max_active <= orig_max_active_); - - // The scheduler must not be active though some threads might still be - // comming off from finishing a task. So we busy-wait for them. - // - lock l (wait_idle ()); - - max_active_ = max_active; - } - - auto scheduler:: - shutdown () -> stat - { - // Our overall approach to shutdown is not to try and stop everything as - // quickly as possible but rather to avoid performing any tasks. This - // avoids having code littered with if(shutdown) on every other line. - - stat r; - lock l (mutex_); - - if (!shutdown_) - { - // Collect statistics. - // - r.thread_helpers = helpers_; - - // Signal shutdown. - // - shutdown_ = true; - - for (size_t i (0); i != wait_queue_size_; ++i) - { - wait_slot& ws (wait_queue_[i]); - lock l (ws.mutex); - ws.shutdown = true; - } - - for (task_queue& tq: task_queues_) - { - lock ql (tq.mutex); - r.task_queue_full += tq.stat_full; - tq.shutdown = true; - } - - // Wait for all the helpers to terminate waking up any thread that - // sleeps. - // - while (helpers_ != 0) - { - bool i (idle_ != 0); - bool r (ready_ != 0); - bool w (waiting_ != 0); - - l.unlock (); - - if (i) - idle_condv_.notify_all (); - - if (r) - ready_condv_.notify_all (); - - if (w) - for (size_t i (0); i != wait_queue_size_; ++i) - wait_queue_[i].condv.notify_all (); - - this_thread::yield (); - l.lock (); - } - - // Free the memory. - // - wait_queue_.reset (); - task_queues_.clear (); - - r.thread_max_active = orig_max_active_; - r.thread_max_total = max_threads_; - r.thread_max_waiting = stat_max_waiters_; - - r.task_queue_depth = task_queue_depth_; - r.task_queue_remain = queued_task_count_.load (memory_order_consume); - - r.wait_queue_slots = wait_queue_size_; - r.wait_queue_collisions = stat_wait_collisions_; - } - - return r; - } - - scheduler::monitor_guard scheduler:: - monitor (atomic_count& c, size_t t, function f) - { - assert (monitor_count_ == nullptr && t != 0); - - // While the scheduler must not be active, some threads might still be - // comming off from finishing a task and trying to report progress. So we - // busy-wait for them (also in ~monitor_guard()). - // - lock l (wait_idle ()); - - monitor_count_ = &c; - monitor_tshold_.store (t, memory_order_relaxed); - monitor_init_ = c.load (memory_order_relaxed); - monitor_func_ = move (f); - - return monitor_guard (this); - } - - void scheduler:: - activate_helper (lock& l) - { - if (!shutdown_) - { - if (idle_ != 0) - { - idle_condv_.notify_one (); - } - // - // Ignore the max_threads value if we have queued tasks but no active - // threads. This means everyone is waiting for something to happen but - // nobody is doing anything (e.g., working the queues). This, for - // example, can happen if a thread waits for a task that is in its queue - // but is below the mark. - // - else if (init_active_ + helpers_ < max_threads_ || - (active_ == 0 && - queued_task_count_.load (memory_order_consume) != 0)) - { - create_helper (l); - } - } - } - - void scheduler:: - create_helper (lock& l) - { - helpers_++; - starting_++; - l.unlock (); - - // Restore the counters if the thread creation fails. - // - struct guard - { - lock* l; - size_t& h; - size_t& s; - - ~guard () {if (l != nullptr) {l->lock (); h--; s--;}} - - } g {&l, helpers_, starting_}; - - // For some platforms/compilers the default stack size for newly created - // threads may differ from that of the main thread. Here are the default - // main/new thread sizes (in KB) for some of them: - // - // Linux : 8192 / 8196 - // FreeBSD : 524288 / 2048 - // MacOS : 8192 / 512 - // MinGW : 2048 / 2048 - // VC : 1024 / 1024 - // - // Provided the main thread size is less-equal than BUILD2_SANE_STACK_SIZE - // (default: sizeof(void*) * BUILD2_DEFAULT_STACK_SIZE), we make sure that - // the new thread stack is the same as for the main thread. Otherwise, we - // cap it at BUILD2_DEFAULT_STACK_SIZE (default: 8MB). This can also be - // overridden at runtime with the --max-stack option (remember to update - // its documentation of changing anything here). - // - // On Windows the stack size is the same for all threads and is customized - // at the linking stage (see build2/buildfile). Thus neither *_STACK_SIZE - // nor --max-stack have any effect here. - // - // On Linux, FreeBSD and MacOS there is no way to change it once and for - // all newly created threads. Thus we will use pthreads, creating threads - // with the stack size of the current thread. This way all threads will - // inherit the main thread's stack size (since the first helper is always - // created by the main thread). - // - // Note also the interaction with our backtrace functionality: in order to - // get the complete stack trace we let unhandled exceptions escape the - // thread function expecting the runtime to still call std::terminate. In - // particular, having a noexcept function anywhere on the exception's path - // causes the stack trace to be truncated, at least on Linux. - // -#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) - -#ifndef BUILD2_DEFAULT_STACK_SIZE -# define BUILD2_DEFAULT_STACK_SIZE 8388608 // 8MB -#endif - -#ifndef BUILD2_SANE_STACK_SIZE -# define BUILD2_SANE_STACK_SIZE (sizeof(void*) * BUILD2_DEFAULT_STACK_SIZE) -#endif - - // Auto-deleter. - // - struct attr_deleter - { - void - operator() (pthread_attr_t* a) const - { - int r (pthread_attr_destroy (a)); - - // We should be able to destroy the valid attributes object, unless - // something is severely damaged. - // - assert (r == 0); - } - }; - - // Calculate the current thread stack size. Don't forget to update #if - // conditions above when adding the stack size customization for a new - // platforms/compilers. - // - size_t stack_size; - { -#ifdef __linux__ - // Note that the attributes must not be initialized. - // - pthread_attr_t attr; - int r (pthread_getattr_np (pthread_self (), &attr)); - - if (r != 0) - throw_system_error (r); - - unique_ptr ad (&attr); - r = pthread_attr_getstacksize (&attr, &stack_size); - - if (r != 0) - throw_system_error (r); - -#elif defined(__FreeBSD__) - pthread_attr_t attr; - int r (pthread_attr_init (&attr)); - - if (r != 0) - throw_system_error (r); - - unique_ptr ad (&attr); - r = pthread_attr_get_np (pthread_self (), &attr); - - if (r != 0) - throw_system_error (r); - - r = pthread_attr_getstacksize (&attr, &stack_size); - - if (r != 0) - throw_system_error (r); - -#else // defined(__APPLE__) - stack_size = pthread_get_stacksize_np (pthread_self ()); -#endif - } - - // Cap the size if necessary. - // - if (max_stack_) - { - if (*max_stack_ != 0 && stack_size > *max_stack_) - stack_size = *max_stack_; - } - else if (stack_size > BUILD2_SANE_STACK_SIZE) - stack_size = BUILD2_DEFAULT_STACK_SIZE; - - pthread_attr_t attr; - int r (pthread_attr_init (&attr)); - - if (r != 0) - throw_system_error (r); - - unique_ptr ad (&attr); - - // Create the thread already detached. - // - r = pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); - - if (r != 0) - throw_system_error (r); - - r = pthread_attr_setstacksize (&attr, stack_size); - - if (r != 0) - throw_system_error (r); - - pthread_t t; - r = pthread_create (&t, &attr, helper, this); - - if (r != 0) - throw_system_error (r); -#else - thread t (helper, this); - t.detach (); -#endif - - g.l = nullptr; // Disarm. - } - - void* scheduler:: - helper (void* d) - { - scheduler& s (*static_cast (d)); - - // Note that this thread can be in an in-between state (not active or - // idle) but only while holding the lock. Which means that if we have the - // lock then we can account for all of them (this is important during - // shutdown). Except when the thread is just starting, before acquiring - // the lock for the first time, which we handle with the starting count. - // - lock l (s.mutex_); - s.starting_--; - - while (!s.shutdown_) - { - // If there is a spare active thread, become active and go looking for - // some work. - // - if (s.active_ < s.max_active_) - { - s.active_++; - - while (s.queued_task_count_.load (memory_order_consume) != 0) - { - // Queues are never removed which means we can get the current range - // and release the main lock while examining each of them. - // - auto it (s.task_queues_.begin ()); - size_t n (s.task_queues_.size ()); // Different to end(). - l.unlock (); - - // Note: we have to be careful not to advance the iterator past the - // last element (since what's past could be changing). - // - for (size_t i (0);; ++it) - { - task_queue& tq (*it); - - for (lock ql (tq.mutex); !tq.shutdown && !s.empty_front (tq); ) - s.pop_front (tq, ql); - - if (++i == n) - break; - } - - l.lock (); - } - - s.active_--; - - // While executing the tasks a thread might have become ready. - // - if (s.ready_ != 0) - s.ready_condv_.notify_one (); - } - - // Become idle and wait for a notification. - // - s.idle_++; - s.idle_condv_.wait (l); - s.idle_--; - } - - s.helpers_--; - return nullptr; - } - -#ifdef __cpp_thread_local - thread_local -#else - __thread -#endif - scheduler::task_queue* scheduler::task_queue_ = nullptr; - - auto scheduler:: - create_queue () -> task_queue& - { - // Note that task_queue_depth is immutable between startup() and - // shutdown() (but see join()). - // - task_queue* tq; - { - lock l (mutex_); - task_queues_.emplace_back (task_queue_depth_); - tq = &task_queues_.back (); - tq->shutdown = shutdown_; - } - - task_queue_ = tq; - return *tq; - } -} -- cgit v1.1