aboutsummaryrefslogtreecommitdiff
path: root/build2/scheduler.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'build2/scheduler.cxx')
-rw-r--r--build2/scheduler.cxx802
1 files changed, 0 insertions, 802 deletions
diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx
deleted file mode 100644
index ad3a640..0000000
--- a/build2/scheduler.cxx
+++ /dev/null
@@ -1,802 +0,0 @@
-// file : build2/scheduler.cxx -*- C++ -*-
-// copyright : Copyright (c) 2014-2019 Code Synthesis Ltd
-// license : MIT; see accompanying LICENSE file
-
-#include <build2/scheduler.hxx>
-
-#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
-# include <pthread.h>
-# ifdef __FreeBSD__
-# include <pthread_np.h> // pthread_attr_get_np()
-# endif
-#endif
-
-#ifndef _WIN32
-# include <thread> // this_thread::sleep_for()
-#else
-# include <libbutl/win32-utility.hxx>
-
-# include <chrono>
-#endif
-
-#include <cerrno>
-#include <exception> // std::terminate()
-
-#include <build2/diagnostics.hxx>
-
-using namespace std;
-
-namespace build2
-{
- size_t scheduler::
- wait (size_t start_count, const atomic_count& task_count, work_queue wq)
- {
- // Note that task_count is a synchronization point.
- //
- size_t tc;
-
- if ((tc = task_count.load (memory_order_acquire)) <= start_count)
- return tc;
-
- assert (max_active_ != 1); // Serial execution, nobody to wait for.
-
- // See if we can run some of our own tasks.
- //
- if (wq != work_none)
- {
- // If we are waiting on someone else's task count then there migh still
- // be no queue (set by async()).
- //
- if (task_queue* tq = task_queue_)
- {
- for (lock ql (tq->mutex); !tq->shutdown && !empty_back (*tq); )
- {
- pop_back (*tq, ql);
-
- if (wq == work_one)
- {
- if ((tc = task_count.load (memory_order_acquire)) <= start_count)
- return tc;
- }
- }
-
- // Note that empty task queue doesn't automatically mean the task
- // count has been decremented (some might still be executing
- // asynchronously).
- //
- if ((tc = task_count.load (memory_order_acquire)) <= start_count)
- return tc;
- }
- }
-
- return suspend (start_count, task_count);
- }
-
- void scheduler::
- deactivate ()
- {
- if (max_active_ == 1) // Serial execution.
- return;
-
- lock l (mutex_);
-
- active_--;
- waiting_++;
- progress_++;
-
- if (waiting_ > stat_max_waiters_)
- stat_max_waiters_ = waiting_;
-
- // A spare active thread has become available. If there are ready masters
- // or eager helpers, wake someone up.
- //
- if (ready_ != 0)
- {
- ready_condv_.notify_one ();
- }
- else if (queued_task_count_.load (std::memory_order_consume) != 0)
- {
- activate_helper (l);
- }
- // @@ TODO: Redo as a separate "monitoring" thread.
- //
- // This still doesn't work for the phase lock case where we call
- // deactivate and then go wait on a condition variable: we are doing
- // deadlock detection while holding the lock that prevents other
- // threads from making progress!
- //
-#if 0
- else if (active_ == 0)
- {
- // We may have a deadlock which can happen because of dependency cycles.
- //
- // Relying on the active_ count alone is not precise enough, however:
- // some threads might be transitioning between the active/waiting/ready
- // states. Carefully accounting for this is not trivial, to say the
- // least (especially in the face of spurious wakeups). So we are going
- // to do a "fuzzy" deadlock detection by measuring "progress". The idea
- // is that those transitions should be pretty short-lived and so if we
- // wait for a couple of hundreds context switches, then we should be
- // able to distinguish a real deadlock from the transition case.
- //
- size_t p (progress_);
-
- for (size_t i (0); i != 100; ++i)
- {
- l.unlock ();
- this_thread::yield () is not enough.
- l.lock ();
-
- if (p != progress_)
- break;
- }
-
- if (p == progress_)
- {
- // Reactivate and fail.
- //
- waiting_--;
- active_++;
-
- // Shutting things down cleanly is tricky: we could have handled it in
- // the scheduler (e.g., by setting a flag and then waking everyone up,
- // similar to shutdown). But there could also be "external waiters"
- // that have called deactivate() -- we have no way to wake those up.
- // So for now we are going to abort (the nice thing about abort is if
- // this is not a dependency cycle, then we have a core to examine).
- //
- error << "deadlock detected, can be caused by a dependency cycle" <<
- info << "re-run with -s to diagnose dependency cycles";
-
- std::terminate ();
- }
- }
-#endif
- }
-
- void scheduler::
- activate (bool collision)
- {
- if (max_active_ == 1) // Serial execution.
- return;
-
- lock l (mutex_);
-
- if (collision)
- stat_wait_collisions_++;
-
- // If we have spare active threads, then become active. Otherwise it
- // enters the ready queue.
- //
- waiting_--;
- ready_++;
- progress_++;
-
- while (!shutdown_ && active_ >= max_active_)
- ready_condv_.wait (l);
-
- ready_--;
- active_++;
- progress_++;
-
- if (shutdown_)
- throw_generic_error (ECANCELED);
- }
-
- void scheduler::
- sleep (const duration& d)
- {
- deactivate ();
-
- // MINGW GCC 4.9 doesn't implement this_thread so use Win32 Sleep().
- //
-#ifndef _WIN32
- this_thread::sleep_for (d);
-#else
- using namespace chrono;
-
- Sleep (static_cast<DWORD> (duration_cast<milliseconds> (d).count ()));
-#endif
-
- activate ();
- }
-
- size_t scheduler::
- suspend (size_t start_count, const atomic_count& task_count)
- {
- wait_slot& s (
- wait_queue_[
- hash<const atomic_count*> () (&task_count) % wait_queue_size_]);
-
- // This thread is no longer active.
- //
- deactivate ();
-
- // Note that the task count is checked while holding the lock. We also
- // have to notify while holding the lock (see resume()). The aim here
- // is not to end up with a notification that happens between the check
- // and the wait.
- //
- size_t tc (0);
- bool collision;
- {
- lock l (s.mutex);
-
- // We have a collision if there is already a waiter for a different
- // task count.
- //
- collision = (s.waiters++ != 0 && s.task_count != &task_count);
-
- // This is nuanced: we want to always have the task count of the last
- // thread to join the queue. Otherwise, if threads are leaving and
- // joining the queue simultaneously, we may end up with a task count of
- // a thread group that is no longer waiting.
- //
- s.task_count = &task_count;
-
- // We could probably relax the atomic access since we use a mutex for
- // synchronization though this has a different tradeoff (calling wait
- // because we don't see the count).
- //
- while (!(s.shutdown ||
- (tc = task_count.load (memory_order_acquire)) <= start_count))
- s.condv.wait (l);
-
- s.waiters--;
- }
-
- // This thread is no longer waiting.
- //
- activate (collision);
-
- return tc;
- }
-
- void scheduler::
- resume (const atomic_count& tc)
- {
- if (max_active_ == 1) // Serial execution, nobody to wakeup.
- return;
-
- wait_slot& s (
- wait_queue_[hash<const atomic_count*> () (&tc) % wait_queue_size_]);
-
- // See suspend() for why we must hold the lock.
- //
- lock l (s.mutex);
-
- if (s.waiters != 0)
- s.condv.notify_all ();
- }
-
- scheduler::
- ~scheduler ()
- {
- try { shutdown (); } catch (system_error&) {}
- }
-
- auto scheduler::
- wait_idle () -> lock
- {
- lock l (mutex_);
-
- assert (waiting_ == 0);
- assert (ready_ == 0);
-
- while (active_ != init_active_ || starting_ != 0)
- {
- l.unlock ();
- this_thread::yield ();
- l.lock ();
- }
-
- return l;
- }
-
- size_t scheduler::
- shard_size (size_t mul, size_t div) const
- {
- size_t n (max_threads_ == 1 ? 0 : max_threads_ * mul / div / 4);
-
- // Experience shows that we want something close to 2x for small numbers,
- // then reduce to 1.5x in-between, and 1x for large ones.
- //
- // Note that Intel Xeons are all over the map when it comes to cores (6,
- // 8, 10, 12, 14, 16, 18, 20, 22).
- //
- return // HW threads x arch-bits (see max_threads below)
- n == 0 ? 1 : // serial
- //
- // 2x
- //
- n == 1 ? 3 :
- n == 2 ? 5 :
- n == 4 ? 11 :
- n == 6 ? 13 :
- n == 8 ? 17 : // 2 x 4
- n == 16 ? 31 : // 4 x 4, 2 x 8
- //
- // 1.5x
- //
- n == 32 ? 47 : // 4 x 8
- n == 48 ? 53 : // 6 x 8
- n == 64 ? 67 : // 8 x 8
- n == 80 ? 89 : // 10 x 8
- //
- // 1x
- //
- n == 96 ? 101 : // 12 x 8
- n == 112 ? 127 : // 14 x 8
- n == 128 ? 131 : // 16 x 8
- n == 144 ? 139 : // 18 x 8
- n == 160 ? 157 : // 20 x 8
- n == 176 ? 173 : // 22 x 8
- n == 192 ? 191 : // 24 x 8
- n == 224 ? 223 : // 28 x 8
- n == 256 ? 251 : // 32 x 8
- n == 288 ? 271 : // 36 x 8
- n == 320 ? 313 : // 40 x 8
- n == 352 ? 331 : // 44 x 8
- n == 384 ? 367 : // 48 x 8
- n == 512 ? 499 : // 64 x 8
- n - 1; // Assume it is even.
- }
-
- void scheduler::
- startup (size_t max_active,
- size_t init_active,
- size_t max_threads,
- size_t queue_depth,
- optional<size_t> max_stack)
- {
- // Lock the mutex to make sure our changes are visible in (other) active
- // threads.
- //
- lock l (mutex_);
-
- max_stack_ = max_stack;
-
- // Use 8x max_active on 32-bit and 32x max_active on 64-bit. Unless we
- // were asked to run serially.
- //
- if (max_threads == 0)
- max_threads = (max_active == 1 ? 1 :
- sizeof (void*) < 8 ? 8 : 32) * max_active;
-
- assert (shutdown_ &&
- init_active != 0 &&
- init_active <= max_active &&
- max_active <= max_threads);
-
- active_ = init_active_ = init_active;
- max_active_ = orig_max_active_ = max_active;
- max_threads_ = max_threads;
-
- // This value should be proportional to the amount of hardware concurrency
- // we have (no use queing things up if helpers cannot keep up). Note that
- // the queue entry is quite sizable.
- //
- // The relationship is as follows: we want to have a deeper queue if the
- // tasks take long (e.g., compilation) and shorter if they are quick (e.g,
- // test execution). If the tasks are quick then the synchronization
- // overhead required for queuing/dequeuing things starts to dominate.
- //
- task_queue_depth_ = queue_depth != 0
- ? queue_depth
- : max_active * 4;
-
- queued_task_count_.store (0, memory_order_relaxed);
-
- if ((wait_queue_size_ = max_threads == 1 ? 0 : shard_size ()) != 0)
- wait_queue_.reset (new wait_slot[wait_queue_size_]);
-
- // Reset counters.
- //
- stat_max_waiters_ = 0;
- stat_wait_collisions_ = 0;
-
- progress_ = 0;
-
- for (size_t i (0); i != wait_queue_size_; ++i)
- wait_queue_[i].shutdown = false;
-
- shutdown_ = false;
- }
-
- void scheduler::
- tune (size_t max_active)
- {
- if (max_active == 0)
- max_active = orig_max_active_;
-
- assert (max_active >= init_active_ &&
- max_active <= orig_max_active_);
-
- // The scheduler must not be active though some threads might still be
- // comming off from finishing a task. So we busy-wait for them.
- //
- lock l (wait_idle ());
-
- max_active_ = max_active;
- }
-
- auto scheduler::
- shutdown () -> stat
- {
- // Our overall approach to shutdown is not to try and stop everything as
- // quickly as possible but rather to avoid performing any tasks. This
- // avoids having code littered with if(shutdown) on every other line.
-
- stat r;
- lock l (mutex_);
-
- if (!shutdown_)
- {
- // Collect statistics.
- //
- r.thread_helpers = helpers_;
-
- // Signal shutdown.
- //
- shutdown_ = true;
-
- for (size_t i (0); i != wait_queue_size_; ++i)
- {
- wait_slot& ws (wait_queue_[i]);
- lock l (ws.mutex);
- ws.shutdown = true;
- }
-
- for (task_queue& tq: task_queues_)
- {
- lock ql (tq.mutex);
- r.task_queue_full += tq.stat_full;
- tq.shutdown = true;
- }
-
- // Wait for all the helpers to terminate waking up any thread that
- // sleeps.
- //
- while (helpers_ != 0)
- {
- bool i (idle_ != 0);
- bool r (ready_ != 0);
- bool w (waiting_ != 0);
-
- l.unlock ();
-
- if (i)
- idle_condv_.notify_all ();
-
- if (r)
- ready_condv_.notify_all ();
-
- if (w)
- for (size_t i (0); i != wait_queue_size_; ++i)
- wait_queue_[i].condv.notify_all ();
-
- this_thread::yield ();
- l.lock ();
- }
-
- // Free the memory.
- //
- wait_queue_.reset ();
- task_queues_.clear ();
-
- r.thread_max_active = orig_max_active_;
- r.thread_max_total = max_threads_;
- r.thread_max_waiting = stat_max_waiters_;
-
- r.task_queue_depth = task_queue_depth_;
- r.task_queue_remain = queued_task_count_.load (memory_order_consume);
-
- r.wait_queue_slots = wait_queue_size_;
- r.wait_queue_collisions = stat_wait_collisions_;
- }
-
- return r;
- }
-
- scheduler::monitor_guard scheduler::
- monitor (atomic_count& c, size_t t, function<size_t (size_t)> f)
- {
- assert (monitor_count_ == nullptr && t != 0);
-
- // While the scheduler must not be active, some threads might still be
- // comming off from finishing a task and trying to report progress. So we
- // busy-wait for them (also in ~monitor_guard()).
- //
- lock l (wait_idle ());
-
- monitor_count_ = &c;
- monitor_tshold_.store (t, memory_order_relaxed);
- monitor_init_ = c.load (memory_order_relaxed);
- monitor_func_ = move (f);
-
- return monitor_guard (this);
- }
-
- void scheduler::
- activate_helper (lock& l)
- {
- if (!shutdown_)
- {
- if (idle_ != 0)
- {
- idle_condv_.notify_one ();
- }
- //
- // Ignore the max_threads value if we have queued tasks but no active
- // threads. This means everyone is waiting for something to happen but
- // nobody is doing anything (e.g., working the queues). This, for
- // example, can happen if a thread waits for a task that is in its queue
- // but is below the mark.
- //
- else if (init_active_ + helpers_ < max_threads_ ||
- (active_ == 0 &&
- queued_task_count_.load (memory_order_consume) != 0))
- {
- create_helper (l);
- }
- }
- }
-
- void scheduler::
- create_helper (lock& l)
- {
- helpers_++;
- starting_++;
- l.unlock ();
-
- // Restore the counters if the thread creation fails.
- //
- struct guard
- {
- lock* l;
- size_t& h;
- size_t& s;
-
- ~guard () {if (l != nullptr) {l->lock (); h--; s--;}}
-
- } g {&l, helpers_, starting_};
-
- // For some platforms/compilers the default stack size for newly created
- // threads may differ from that of the main thread. Here are the default
- // main/new thread sizes (in KB) for some of them:
- //
- // Linux : 8192 / 8196
- // FreeBSD : 524288 / 2048
- // MacOS : 8192 / 512
- // MinGW : 2048 / 2048
- // VC : 1024 / 1024
- //
- // Provided the main thread size is less-equal than BUILD2_SANE_STACK_SIZE
- // (default: sizeof(void*) * BUILD2_DEFAULT_STACK_SIZE), we make sure that
- // the new thread stack is the same as for the main thread. Otherwise, we
- // cap it at BUILD2_DEFAULT_STACK_SIZE (default: 8MB). This can also be
- // overridden at runtime with the --max-stack option (remember to update
- // its documentation of changing anything here).
- //
- // On Windows the stack size is the same for all threads and is customized
- // at the linking stage (see build2/buildfile). Thus neither *_STACK_SIZE
- // nor --max-stack have any effect here.
- //
- // On Linux, FreeBSD and MacOS there is no way to change it once and for
- // all newly created threads. Thus we will use pthreads, creating threads
- // with the stack size of the current thread. This way all threads will
- // inherit the main thread's stack size (since the first helper is always
- // created by the main thread).
- //
- // Note also the interaction with our backtrace functionality: in order to
- // get the complete stack trace we let unhandled exceptions escape the
- // thread function expecting the runtime to still call std::terminate. In
- // particular, having a noexcept function anywhere on the exception's path
- // causes the stack trace to be truncated, at least on Linux.
- //
-#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
-
-#ifndef BUILD2_DEFAULT_STACK_SIZE
-# define BUILD2_DEFAULT_STACK_SIZE 8388608 // 8MB
-#endif
-
-#ifndef BUILD2_SANE_STACK_SIZE
-# define BUILD2_SANE_STACK_SIZE (sizeof(void*) * BUILD2_DEFAULT_STACK_SIZE)
-#endif
-
- // Auto-deleter.
- //
- struct attr_deleter
- {
- void
- operator() (pthread_attr_t* a) const
- {
- int r (pthread_attr_destroy (a));
-
- // We should be able to destroy the valid attributes object, unless
- // something is severely damaged.
- //
- assert (r == 0);
- }
- };
-
- // Calculate the current thread stack size. Don't forget to update #if
- // conditions above when adding the stack size customization for a new
- // platforms/compilers.
- //
- size_t stack_size;
- {
-#ifdef __linux__
- // Note that the attributes must not be initialized.
- //
- pthread_attr_t attr;
- int r (pthread_getattr_np (pthread_self (), &attr));
-
- if (r != 0)
- throw_system_error (r);
-
- unique_ptr<pthread_attr_t, attr_deleter> ad (&attr);
- r = pthread_attr_getstacksize (&attr, &stack_size);
-
- if (r != 0)
- throw_system_error (r);
-
-#elif defined(__FreeBSD__)
- pthread_attr_t attr;
- int r (pthread_attr_init (&attr));
-
- if (r != 0)
- throw_system_error (r);
-
- unique_ptr<pthread_attr_t, attr_deleter> ad (&attr);
- r = pthread_attr_get_np (pthread_self (), &attr);
-
- if (r != 0)
- throw_system_error (r);
-
- r = pthread_attr_getstacksize (&attr, &stack_size);
-
- if (r != 0)
- throw_system_error (r);
-
-#else // defined(__APPLE__)
- stack_size = pthread_get_stacksize_np (pthread_self ());
-#endif
- }
-
- // Cap the size if necessary.
- //
- if (max_stack_)
- {
- if (*max_stack_ != 0 && stack_size > *max_stack_)
- stack_size = *max_stack_;
- }
- else if (stack_size > BUILD2_SANE_STACK_SIZE)
- stack_size = BUILD2_DEFAULT_STACK_SIZE;
-
- pthread_attr_t attr;
- int r (pthread_attr_init (&attr));
-
- if (r != 0)
- throw_system_error (r);
-
- unique_ptr<pthread_attr_t, attr_deleter> ad (&attr);
-
- // Create the thread already detached.
- //
- r = pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
-
- if (r != 0)
- throw_system_error (r);
-
- r = pthread_attr_setstacksize (&attr, stack_size);
-
- if (r != 0)
- throw_system_error (r);
-
- pthread_t t;
- r = pthread_create (&t, &attr, helper, this);
-
- if (r != 0)
- throw_system_error (r);
-#else
- thread t (helper, this);
- t.detach ();
-#endif
-
- g.l = nullptr; // Disarm.
- }
-
- void* scheduler::
- helper (void* d)
- {
- scheduler& s (*static_cast<scheduler*> (d));
-
- // Note that this thread can be in an in-between state (not active or
- // idle) but only while holding the lock. Which means that if we have the
- // lock then we can account for all of them (this is important during
- // shutdown). Except when the thread is just starting, before acquiring
- // the lock for the first time, which we handle with the starting count.
- //
- lock l (s.mutex_);
- s.starting_--;
-
- while (!s.shutdown_)
- {
- // If there is a spare active thread, become active and go looking for
- // some work.
- //
- if (s.active_ < s.max_active_)
- {
- s.active_++;
-
- while (s.queued_task_count_.load (memory_order_consume) != 0)
- {
- // Queues are never removed which means we can get the current range
- // and release the main lock while examining each of them.
- //
- auto it (s.task_queues_.begin ());
- size_t n (s.task_queues_.size ()); // Different to end().
- l.unlock ();
-
- // Note: we have to be careful not to advance the iterator past the
- // last element (since what's past could be changing).
- //
- for (size_t i (0);; ++it)
- {
- task_queue& tq (*it);
-
- for (lock ql (tq.mutex); !tq.shutdown && !s.empty_front (tq); )
- s.pop_front (tq, ql);
-
- if (++i == n)
- break;
- }
-
- l.lock ();
- }
-
- s.active_--;
-
- // While executing the tasks a thread might have become ready.
- //
- if (s.ready_ != 0)
- s.ready_condv_.notify_one ();
- }
-
- // Become idle and wait for a notification.
- //
- s.idle_++;
- s.idle_condv_.wait (l);
- s.idle_--;
- }
-
- s.helpers_--;
- return nullptr;
- }
-
-#ifdef __cpp_thread_local
- thread_local
-#else
- __thread
-#endif
- scheduler::task_queue* scheduler::task_queue_ = nullptr;
-
- auto scheduler::
- create_queue () -> task_queue&
- {
- // Note that task_queue_depth is immutable between startup() and
- // shutdown() (but see join()).
- //
- task_queue* tq;
- {
- lock l (mutex_);
- task_queues_.emplace_back (task_queue_depth_);
- tq = &task_queues_.back ();
- tq->shutdown = shutdown_;
- }
-
- task_queue_ = tq;
- return *tq;
- }
-}