diff options
author | Boris Kolpackov <boris@codesynthesis.com> | 2019-06-24 12:01:19 +0200 |
---|---|---|
committer | Karen Arutyunov <karen@codesynthesis.com> | 2019-07-01 18:13:55 +0300 |
commit | 977d07a3ae47ef204665d1eda2d642e5064724f3 (patch) | |
tree | 525a3d6421f61ce789b690191d3c30fc09be3517 /libbuild2/scheduler.cxx | |
parent | 7161b24963dd9da4d218f92c736b77c35c328a2d (diff) |
Split build system into library and driver
Diffstat (limited to 'libbuild2/scheduler.cxx')
-rw-r--r-- | libbuild2/scheduler.cxx | 820 |
1 files changed, 820 insertions, 0 deletions
diff --git a/libbuild2/scheduler.cxx b/libbuild2/scheduler.cxx new file mode 100644 index 0000000..8ac2b97 --- /dev/null +++ b/libbuild2/scheduler.cxx @@ -0,0 +1,820 @@ +// file : libbuild2/scheduler.cxx -*- C++ -*- +// copyright : Copyright (c) 2014-2019 Code Synthesis Ltd +// license : MIT; see accompanying LICENSE file + +#include <libbuild2/scheduler.hxx> + +#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) +# include <pthread.h> +# ifdef __FreeBSD__ +# include <pthread_np.h> // pthread_attr_get_np() +# endif +#endif + +#ifndef _WIN32 +# include <thread> // this_thread::sleep_for() +#else +# include <libbutl/win32-utility.hxx> + +# include <chrono> +#endif + +#include <cerrno> +#include <exception> // std::terminate() + +#include <libbuild2/diagnostics.hxx> + +using namespace std; + +namespace build2 +{ + // TLS cache of thread's task queue. + // + // Note that scheduler::task_queue struct is private. + // + static +#ifdef __cpp_thread_local + thread_local +#else + __thread +#endif + void* scheduler_queue = nullptr; + + scheduler::task_queue* scheduler:: + queue () noexcept + { + return static_cast<scheduler::task_queue*> (scheduler_queue); + } + + void scheduler:: + queue (scheduler::task_queue* q) noexcept + { + scheduler_queue = q; + } + + size_t scheduler:: + wait (size_t start_count, const atomic_count& task_count, work_queue wq) + { + // Note that task_count is a synchronization point. + // + size_t tc; + + if ((tc = task_count.load (memory_order_acquire)) <= start_count) + return tc; + + assert (max_active_ != 1); // Serial execution, nobody to wait for. + + // See if we can run some of our own tasks. + // + if (wq != work_none) + { + // If we are waiting on someone else's task count then there migh still + // be no queue (set by async()). + // + if (task_queue* tq = queue ()) + { + for (lock ql (tq->mutex); !tq->shutdown && !empty_back (*tq); ) + { + pop_back (*tq, ql); + + if (wq == work_one) + { + if ((tc = task_count.load (memory_order_acquire)) <= start_count) + return tc; + } + } + + // Note that empty task queue doesn't automatically mean the task + // count has been decremented (some might still be executing + // asynchronously). + // + if ((tc = task_count.load (memory_order_acquire)) <= start_count) + return tc; + } + } + + return suspend (start_count, task_count); + } + + void scheduler:: + deactivate () + { + if (max_active_ == 1) // Serial execution. + return; + + lock l (mutex_); + + active_--; + waiting_++; + progress_++; + + if (waiting_ > stat_max_waiters_) + stat_max_waiters_ = waiting_; + + // A spare active thread has become available. If there are ready masters + // or eager helpers, wake someone up. + // + if (ready_ != 0) + { + ready_condv_.notify_one (); + } + else if (queued_task_count_.load (std::memory_order_consume) != 0) + { + activate_helper (l); + } + // @@ TODO: Redo as a separate "monitoring" thread. + // + // This still doesn't work for the phase lock case where we call + // deactivate and then go wait on a condition variable: we are doing + // deadlock detection while holding the lock that prevents other + // threads from making progress! + // +#if 0 + else if (active_ == 0) + { + // We may have a deadlock which can happen because of dependency cycles. + // + // Relying on the active_ count alone is not precise enough, however: + // some threads might be transitioning between the active/waiting/ready + // states. Carefully accounting for this is not trivial, to say the + // least (especially in the face of spurious wakeups). So we are going + // to do a "fuzzy" deadlock detection by measuring "progress". The idea + // is that those transitions should be pretty short-lived and so if we + // wait for a couple of hundreds context switches, then we should be + // able to distinguish a real deadlock from the transition case. + // + size_t p (progress_); + + for (size_t i (0); i != 100; ++i) + { + l.unlock (); + this_thread::yield () is not enough. + l.lock (); + + if (p != progress_) + break; + } + + if (p == progress_) + { + // Reactivate and fail. + // + waiting_--; + active_++; + + // Shutting things down cleanly is tricky: we could have handled it in + // the scheduler (e.g., by setting a flag and then waking everyone up, + // similar to shutdown). But there could also be "external waiters" + // that have called deactivate() -- we have no way to wake those up. + // So for now we are going to abort (the nice thing about abort is if + // this is not a dependency cycle, then we have a core to examine). + // + error << "deadlock detected, can be caused by a dependency cycle" << + info << "re-run with -s to diagnose dependency cycles"; + + std::terminate (); + } + } +#endif + } + + void scheduler:: + activate (bool collision) + { + if (max_active_ == 1) // Serial execution. + return; + + lock l (mutex_); + + if (collision) + stat_wait_collisions_++; + + // If we have spare active threads, then become active. Otherwise it + // enters the ready queue. + // + waiting_--; + ready_++; + progress_++; + + while (!shutdown_ && active_ >= max_active_) + ready_condv_.wait (l); + + ready_--; + active_++; + progress_++; + + if (shutdown_) + throw_generic_error (ECANCELED); + } + + void scheduler:: + sleep (const duration& d) + { + deactivate (); + + // MINGW GCC 4.9 doesn't implement this_thread so use Win32 Sleep(). + // +#ifndef _WIN32 + this_thread::sleep_for (d); +#else + using namespace chrono; + + Sleep (static_cast<DWORD> (duration_cast<milliseconds> (d).count ())); +#endif + + activate (); + } + + size_t scheduler:: + suspend (size_t start_count, const atomic_count& task_count) + { + wait_slot& s ( + wait_queue_[ + hash<const atomic_count*> () (&task_count) % wait_queue_size_]); + + // This thread is no longer active. + // + deactivate (); + + // Note that the task count is checked while holding the lock. We also + // have to notify while holding the lock (see resume()). The aim here + // is not to end up with a notification that happens between the check + // and the wait. + // + size_t tc (0); + bool collision; + { + lock l (s.mutex); + + // We have a collision if there is already a waiter for a different + // task count. + // + collision = (s.waiters++ != 0 && s.task_count != &task_count); + + // This is nuanced: we want to always have the task count of the last + // thread to join the queue. Otherwise, if threads are leaving and + // joining the queue simultaneously, we may end up with a task count of + // a thread group that is no longer waiting. + // + s.task_count = &task_count; + + // We could probably relax the atomic access since we use a mutex for + // synchronization though this has a different tradeoff (calling wait + // because we don't see the count). + // + while (!(s.shutdown || + (tc = task_count.load (memory_order_acquire)) <= start_count)) + s.condv.wait (l); + + s.waiters--; + } + + // This thread is no longer waiting. + // + activate (collision); + + return tc; + } + + void scheduler:: + resume (const atomic_count& tc) + { + if (max_active_ == 1) // Serial execution, nobody to wakeup. + return; + + wait_slot& s ( + wait_queue_[hash<const atomic_count*> () (&tc) % wait_queue_size_]); + + // See suspend() for why we must hold the lock. + // + lock l (s.mutex); + + if (s.waiters != 0) + s.condv.notify_all (); + } + + scheduler:: + ~scheduler () + { + try { shutdown (); } catch (system_error&) {} + } + + auto scheduler:: + wait_idle () -> lock + { + lock l (mutex_); + + assert (waiting_ == 0); + assert (ready_ == 0); + + while (active_ != init_active_ || starting_ != 0) + { + l.unlock (); + this_thread::yield (); + l.lock (); + } + + return l; + } + + size_t scheduler:: + shard_size (size_t mul, size_t div) const + { + size_t n (max_threads_ == 1 ? 0 : max_threads_ * mul / div / 4); + + // Experience shows that we want something close to 2x for small numbers, + // then reduce to 1.5x in-between, and 1x for large ones. + // + // Note that Intel Xeons are all over the map when it comes to cores (6, + // 8, 10, 12, 14, 16, 18, 20, 22). + // + return // HW threads x arch-bits (see max_threads below) + n == 0 ? 1 : // serial + // + // 2x + // + n == 1 ? 3 : + n == 2 ? 5 : + n == 4 ? 11 : + n == 6 ? 13 : + n == 8 ? 17 : // 2 x 4 + n == 16 ? 31 : // 4 x 4, 2 x 8 + // + // 1.5x + // + n == 32 ? 47 : // 4 x 8 + n == 48 ? 53 : // 6 x 8 + n == 64 ? 67 : // 8 x 8 + n == 80 ? 89 : // 10 x 8 + // + // 1x + // + n == 96 ? 101 : // 12 x 8 + n == 112 ? 127 : // 14 x 8 + n == 128 ? 131 : // 16 x 8 + n == 144 ? 139 : // 18 x 8 + n == 160 ? 157 : // 20 x 8 + n == 176 ? 173 : // 22 x 8 + n == 192 ? 191 : // 24 x 8 + n == 224 ? 223 : // 28 x 8 + n == 256 ? 251 : // 32 x 8 + n == 288 ? 271 : // 36 x 8 + n == 320 ? 313 : // 40 x 8 + n == 352 ? 331 : // 44 x 8 + n == 384 ? 367 : // 48 x 8 + n == 512 ? 499 : // 64 x 8 + n - 1; // Assume it is even. + } + + void scheduler:: + startup (size_t max_active, + size_t init_active, + size_t max_threads, + size_t queue_depth, + optional<size_t> max_stack) + { + // Lock the mutex to make sure our changes are visible in (other) active + // threads. + // + lock l (mutex_); + + max_stack_ = max_stack; + + // Use 8x max_active on 32-bit and 32x max_active on 64-bit. Unless we + // were asked to run serially. + // + if (max_threads == 0) + max_threads = (max_active == 1 ? 1 : + sizeof (void*) < 8 ? 8 : 32) * max_active; + + assert (shutdown_ && + init_active != 0 && + init_active <= max_active && + max_active <= max_threads); + + active_ = init_active_ = init_active; + max_active_ = orig_max_active_ = max_active; + max_threads_ = max_threads; + + // This value should be proportional to the amount of hardware concurrency + // we have (no use queing things up if helpers cannot keep up). Note that + // the queue entry is quite sizable. + // + // The relationship is as follows: we want to have a deeper queue if the + // tasks take long (e.g., compilation) and shorter if they are quick (e.g, + // test execution). If the tasks are quick then the synchronization + // overhead required for queuing/dequeuing things starts to dominate. + // + task_queue_depth_ = queue_depth != 0 + ? queue_depth + : max_active * 4; + + queued_task_count_.store (0, memory_order_relaxed); + + if ((wait_queue_size_ = max_threads == 1 ? 0 : shard_size ()) != 0) + wait_queue_.reset (new wait_slot[wait_queue_size_]); + + // Reset counters. + // + stat_max_waiters_ = 0; + stat_wait_collisions_ = 0; + + progress_ = 0; + + for (size_t i (0); i != wait_queue_size_; ++i) + wait_queue_[i].shutdown = false; + + shutdown_ = false; + } + + void scheduler:: + tune (size_t max_active) + { + if (max_active == 0) + max_active = orig_max_active_; + + assert (max_active >= init_active_ && + max_active <= orig_max_active_); + + // The scheduler must not be active though some threads might still be + // comming off from finishing a task. So we busy-wait for them. + // + lock l (wait_idle ()); + + max_active_ = max_active; + } + + auto scheduler:: + shutdown () -> stat + { + // Our overall approach to shutdown is not to try and stop everything as + // quickly as possible but rather to avoid performing any tasks. This + // avoids having code littered with if(shutdown) on every other line. + + stat r; + lock l (mutex_); + + if (!shutdown_) + { + // Collect statistics. + // + r.thread_helpers = helpers_; + + // Signal shutdown. + // + shutdown_ = true; + + for (size_t i (0); i != wait_queue_size_; ++i) + { + wait_slot& ws (wait_queue_[i]); + lock l (ws.mutex); + ws.shutdown = true; + } + + for (task_queue& tq: task_queues_) + { + lock ql (tq.mutex); + r.task_queue_full += tq.stat_full; + tq.shutdown = true; + } + + // Wait for all the helpers to terminate waking up any thread that + // sleeps. + // + while (helpers_ != 0) + { + bool i (idle_ != 0); + bool r (ready_ != 0); + bool w (waiting_ != 0); + + l.unlock (); + + if (i) + idle_condv_.notify_all (); + + if (r) + ready_condv_.notify_all (); + + if (w) + for (size_t i (0); i != wait_queue_size_; ++i) + wait_queue_[i].condv.notify_all (); + + this_thread::yield (); + l.lock (); + } + + // Free the memory. + // + wait_queue_.reset (); + task_queues_.clear (); + + r.thread_max_active = orig_max_active_; + r.thread_max_total = max_threads_; + r.thread_max_waiting = stat_max_waiters_; + + r.task_queue_depth = task_queue_depth_; + r.task_queue_remain = queued_task_count_.load (memory_order_consume); + + r.wait_queue_slots = wait_queue_size_; + r.wait_queue_collisions = stat_wait_collisions_; + } + + return r; + } + + scheduler::monitor_guard scheduler:: + monitor (atomic_count& c, size_t t, function<size_t (size_t)> f) + { + assert (monitor_count_ == nullptr && t != 0); + + // While the scheduler must not be active, some threads might still be + // comming off from finishing a task and trying to report progress. So we + // busy-wait for them (also in ~monitor_guard()). + // + lock l (wait_idle ()); + + monitor_count_ = &c; + monitor_tshold_.store (t, memory_order_relaxed); + monitor_init_ = c.load (memory_order_relaxed); + monitor_func_ = move (f); + + return monitor_guard (this); + } + + void scheduler:: + activate_helper (lock& l) + { + if (!shutdown_) + { + if (idle_ != 0) + { + idle_condv_.notify_one (); + } + // + // Ignore the max_threads value if we have queued tasks but no active + // threads. This means everyone is waiting for something to happen but + // nobody is doing anything (e.g., working the queues). This, for + // example, can happen if a thread waits for a task that is in its queue + // but is below the mark. + // + else if (init_active_ + helpers_ < max_threads_ || + (active_ == 0 && + queued_task_count_.load (memory_order_consume) != 0)) + { + create_helper (l); + } + } + } + + void scheduler:: + create_helper (lock& l) + { + helpers_++; + starting_++; + l.unlock (); + + // Restore the counters if the thread creation fails. + // + struct guard + { + lock* l; + size_t& h; + size_t& s; + + ~guard () {if (l != nullptr) {l->lock (); h--; s--;}} + + } g {&l, helpers_, starting_}; + + // For some platforms/compilers the default stack size for newly created + // threads may differ from that of the main thread. Here are the default + // main/new thread sizes (in KB) for some of them: + // + // Linux : 8192 / 8196 + // FreeBSD : 524288 / 2048 + // MacOS : 8192 / 512 + // MinGW : 2048 / 2048 + // VC : 1024 / 1024 + // + // Provided the main thread size is less-equal than + // LIBBUILD2_SANE_STACK_SIZE (which defaults to + // sizeof(void*)*LIBBUILD2_DEFAULT_STACK_SIZE), we make sure that the new + // thread stack is the same as for the main thread. Otherwise, we cap it + // at LIBBUILD2_DEFAULT_STACK_SIZE (default: 8MB). This can also be + // overridden at runtime with the --max-stack build2 driver option + // (remember to update its documentation of changing anything here). + // + // On Windows the stack size is the same for all threads and is customized + // at the linking stage (see build2/buildfile). Thus neither *_STACK_SIZE + // nor --max-stack have any effect here. + // + // On Linux, FreeBSD and MacOS there is no way to change it once and for + // all newly created threads. Thus we will use pthreads, creating threads + // with the stack size of the current thread. This way all threads will + // inherit the main thread's stack size (since the first helper is always + // created by the main thread). + // + // Note also the interaction with our backtrace functionality: in order to + // get the complete stack trace we let unhandled exceptions escape the + // thread function expecting the runtime to still call std::terminate. In + // particular, having a noexcept function anywhere on the exception's path + // causes the stack trace to be truncated, at least on Linux. + // +#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) + +#ifndef LIBBUILD2_DEFAULT_STACK_SIZE +# define LIBBUILD2_DEFAULT_STACK_SIZE 8388608 // 8MB +#endif + +#ifndef LIBBUILD2_SANE_STACK_SIZE +# define LIBBUILD2_SANE_STACK_SIZE (sizeof(void*) * LIBBUILD2_DEFAULT_STACK_SIZE) +#endif + + // Auto-deleter. + // + struct attr_deleter + { + void + operator() (pthread_attr_t* a) const + { + int r (pthread_attr_destroy (a)); + + // We should be able to destroy the valid attributes object, unless + // something is severely damaged. + // + assert (r == 0); + } + }; + + // Calculate the current thread stack size. Don't forget to update #if + // conditions above when adding the stack size customization for a new + // platforms/compilers. + // + size_t stack_size; + { +#ifdef __linux__ + // Note that the attributes must not be initialized. + // + pthread_attr_t attr; + int r (pthread_getattr_np (pthread_self (), &attr)); + + if (r != 0) + throw_system_error (r); + + unique_ptr<pthread_attr_t, attr_deleter> ad (&attr); + r = pthread_attr_getstacksize (&attr, &stack_size); + + if (r != 0) + throw_system_error (r); + +#elif defined(__FreeBSD__) + pthread_attr_t attr; + int r (pthread_attr_init (&attr)); + + if (r != 0) + throw_system_error (r); + + unique_ptr<pthread_attr_t, attr_deleter> ad (&attr); + r = pthread_attr_get_np (pthread_self (), &attr); + + if (r != 0) + throw_system_error (r); + + r = pthread_attr_getstacksize (&attr, &stack_size); + + if (r != 0) + throw_system_error (r); + +#else // defined(__APPLE__) + stack_size = pthread_get_stacksize_np (pthread_self ()); +#endif + } + + // Cap the size if necessary. + // + if (max_stack_) + { + if (*max_stack_ != 0 && stack_size > *max_stack_) + stack_size = *max_stack_; + } + else if (stack_size > LIBBUILD2_SANE_STACK_SIZE) + stack_size = LIBBUILD2_DEFAULT_STACK_SIZE; + + pthread_attr_t attr; + int r (pthread_attr_init (&attr)); + + if (r != 0) + throw_system_error (r); + + unique_ptr<pthread_attr_t, attr_deleter> ad (&attr); + + // Create the thread already detached. + // + r = pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + + if (r != 0) + throw_system_error (r); + + r = pthread_attr_setstacksize (&attr, stack_size); + + if (r != 0) + throw_system_error (r); + + pthread_t t; + r = pthread_create (&t, &attr, helper, this); + + if (r != 0) + throw_system_error (r); +#else + thread t (helper, this); + t.detach (); +#endif + + g.l = nullptr; // Disarm. + } + + void* scheduler:: + helper (void* d) + { + scheduler& s (*static_cast<scheduler*> (d)); + + // Note that this thread can be in an in-between state (not active or + // idle) but only while holding the lock. Which means that if we have the + // lock then we can account for all of them (this is important during + // shutdown). Except when the thread is just starting, before acquiring + // the lock for the first time, which we handle with the starting count. + // + lock l (s.mutex_); + s.starting_--; + + while (!s.shutdown_) + { + // If there is a spare active thread, become active and go looking for + // some work. + // + if (s.active_ < s.max_active_) + { + s.active_++; + + while (s.queued_task_count_.load (memory_order_consume) != 0) + { + // Queues are never removed which means we can get the current range + // and release the main lock while examining each of them. + // + auto it (s.task_queues_.begin ()); + size_t n (s.task_queues_.size ()); // Different to end(). + l.unlock (); + + // Note: we have to be careful not to advance the iterator past the + // last element (since what's past could be changing). + // + for (size_t i (0);; ++it) + { + task_queue& tq (*it); + + for (lock ql (tq.mutex); !tq.shutdown && !s.empty_front (tq); ) + s.pop_front (tq, ql); + + if (++i == n) + break; + } + + l.lock (); + } + + s.active_--; + + // While executing the tasks a thread might have become ready. + // + if (s.ready_ != 0) + s.ready_condv_.notify_one (); + } + + // Become idle and wait for a notification. + // + s.idle_++; + s.idle_condv_.wait (l); + s.idle_--; + } + + s.helpers_--; + return nullptr; + } + + auto scheduler:: + create_queue () -> task_queue& + { + // Note that task_queue_depth is immutable between startup() and + // shutdown() (but see join()). + // + task_queue* tq; + { + lock l (mutex_); + task_queues_.emplace_back (task_queue_depth_); + tq = &task_queues_.back (); + tq->shutdown = shutdown_; + } + + queue (tq); + return *tq; + } +} |