aboutsummaryrefslogtreecommitdiff
path: root/build2/scheduler
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2016-12-09 17:29:27 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2016-12-09 17:29:27 +0200
commitea22643b2217921df74ea14df47d7c83987d5761 (patch)
tree91480771997be1b7f92f46ee404c266e0f4dcd76 /build2/scheduler
parent1a9d610051cd48c98fb71a570a0871b4e073cec9 (diff)
Initial parallel scheduler implementation, use to run testscrips
Diffstat (limited to 'build2/scheduler')
-rw-r--r--build2/scheduler453
1 files changed, 453 insertions, 0 deletions
diff --git a/build2/scheduler b/build2/scheduler
new file mode 100644
index 0000000..99e9524
--- /dev/null
+++ b/build2/scheduler
@@ -0,0 +1,453 @@
+// file : build2/scheduler -*- C++ -*-
+// copyright : Copyright (c) 2014-2016 Code Synthesis Ltd
+// license : MIT; see accompanying LICENSE file
+
+#ifndef BUILD2_SCHEDULER
+#define BUILD2_SCHEDULER
+
+#include <mutex>
+#include <tuple>
+#include <atomic>
+#include <cerrno>
+#include <cassert>
+#include <type_traits> // aligned_storage, etc
+#include <condition_variable>
+
+#include <build2/types>
+#include <build2/utility>
+
+namespace build2
+{
+ // Scheduler of tasks and threads. Works best for "substantial" tasks (e.g.,
+ // running a process), where in comparison thread synchronization overhead
+ // is negligible.
+ //
+ // A thread (called "master") may need to perform several tasks which can be
+ // done in parallel (e.g., update all the prerequisites or run all the
+ // tests). To acomplish this, the master, via a call to async(), can ask the
+ // scheduler to run a task in another thread (called "helper"). If a helper
+ // is available, then the task is executed asynchronously by such helper.
+ // Otherwise, the task is executed synchronously as part of the async()
+ // call. Once the master thread has scheduled all the tasks, it calls wait()
+ // to await for their completion.
+ //
+ // The scheduler makes sure that only a certain number of threads (for
+ // example, the number of available hardware threads) are "active" at any
+ // given time (thus the reason why async() may choose to perform the task
+ // synchronously). When a master thread calls wait(), it is "suspended"
+ // until all its asynchronous tasks are completed (at which point it becomes
+ // "ready"). A suspension of a master results in either another ready master
+ // being "resumed" or another helper thread becoming available.
+ //
+ // On completion of a task a helper thread returns to the scheduler which
+ // can again lead either to a ready master being resumed (in which case the
+ // helper is suspended) or the helper becoming available to perform another
+ // task.
+ //
+ // Note that suspended threads are not reused as helpers. Rather, a new
+ // helper thread is always created if none is available. This is done to
+ // allow a ready master to continue as soon as possible. If it were reused
+ // as a helper, then it could be blocked on a nested wait() further down the
+ // stack. This means that the number of threads created by the scheduler
+ // will normally exceed the maximum active allowed. It should not, however,
+ // get excessive provided the tasks are not too deeply nested.
+ //
+ // @@ Actually, this is not quite correct: this seems to be a function of
+ // both depth and width of the task tree. Perhaps we should use a hybrid
+ // approach: when the number of helper threads reaches a certain limit
+ // we switch to reusing suspended threads? Also, a thread can rake its
+ // own and its subtask's queues directly in wait() since wait() won't
+ // return until they are done.
+ //
+ class scheduler
+ {
+ public:
+ using atomic_count = std::atomic<size_t>;
+
+ // F should return void and not throw any exceptions. The way the result
+ // of a task is communicated back to the master thread is ad hoc, usually
+ // via "out" arguments. Such result(s) can only be retrieved by the master
+ // once its task count reaches 0.
+ //
+ // The argument passing semantics is the same as for std::thread. In
+ // particular, lvalue-references are passed as copies (use ref()/cref()
+ // for the by-reference semantics).
+ //
+ // If the scheduler is shutdown, throw system_error(ECANCELED).
+ //
+ template <typename F, typename... A>
+ void
+ async (atomic_count& task_count, F&& f, A&&... a)
+ {
+ using task = task_type<F, A...>;
+
+ static_assert (sizeof (task) <= sizeof (task_data),
+ "insufficient space");
+
+ static_assert (std::is_trivially_destructible<task>::value,
+ "not trivially destructible");
+
+ // Push into the queue unless we are running serially or the queue is
+ // full.
+ //
+ task_data* td (nullptr);
+
+ if (max_active_ != 1)
+ {
+ task_queue& tq (task_queue_ != nullptr
+ ? *task_queue_
+ : create_queue ());
+
+ lock ql (tq.mutex);
+
+ if (tq.shutdown)
+ throw system_error (ECANCELED, std::system_category ());
+
+ if ((td = push (tq)) != nullptr)
+ {
+ // Package the task.
+ //
+ new (&td->data) task {
+ &task_count,
+ decay_copy (forward <F> (f)),
+ {decay_copy (forward <A> (a))...}};
+
+ td->thunk = &task_thunk<F, A...>;
+ }
+ else
+ tq.stat_full++;
+ }
+
+ // If serial/full, then run the task synchronously. In this case
+ // there is no need to mess with task count.
+ //
+ if (td == nullptr)
+ {
+ forward<F> (f) (forward<A> (a)...);
+ return;
+ }
+
+ // Increment the task count.
+ //
+ task_count.fetch_add (1, std::memory_order_release);
+
+ lock l (mutex_);
+ task_ = true;
+
+ // If there is a spare active thread, wake up (or create) the helper.
+ //
+ if (active_ < max_active_)
+ activate_helper (l);
+ }
+
+ // Wait until the task count reaches 0. If the scheduler is shutdown
+ // while waiting, throw system_error(ECANCELED).
+ //
+ void
+ wait (atomic_count& task_count);
+
+ // Startup and shutdown.
+ //
+ public:
+ // Unless already shut down, call shutdown() but ignore errors.
+ //
+ ~scheduler ();
+
+ // Create a shut down scheduler.
+ //
+ scheduler () = default;
+
+ // Create a started up scheduler.
+ //
+ // The initial active argument is the number of threads to assume are
+ // already active (e.g., the calling thread). It must not be 0 (since
+ // someone has to schedule the first task).
+ //
+ // If maximum threads is unspecified, then a generally appropriate default
+ // limit it used.
+ //
+ scheduler (size_t max_active,
+ size_t init_active = 1,
+ size_t max_threads = 0)
+ {
+ startup (max_active, init_active, max_threads);
+ }
+
+ // Note: naturally not thread-safe.
+ //
+ void
+ startup (size_t max_active,
+ size_t init_active = 1,
+ size_t max_threads = 0);
+
+ // Wait for all the helper threads to terminate. Throw system_error on
+ // failure. Note that the initially active threads are not waited for.
+ // Return scheduling statistics.
+ //
+ struct stat
+ {
+ size_t thread_max_active = 0; // max # of active threads allowed.
+ size_t thread_max_total = 0; // max # of total threads allowed.
+ size_t thread_helpers = 0; // # of helper threads created.
+ size_t thread_max_waiting = 0; // max # of waiters at any time.
+
+ size_t task_queue_depth = 0; // # of entries in a queue (capacity).
+ size_t task_queue_full = 0; // # of times task queue was full.
+
+ size_t wait_queue_slots = 0; // # of wait slots (buckets).
+ size_t wait_queue_collisions = 0; // # of times slot had been occupied.
+ };
+
+ stat
+ shutdown ();
+
+ // Return the number of hardware threads or 0 if unable to determine.
+ //
+ static size_t
+ hardware_concurrency ()
+ {
+ return std::thread::hardware_concurrency ();
+ }
+
+ private:
+ using lock = std::unique_lock<std::mutex>;
+
+ void
+ activate_helper (lock&);
+
+ void
+ create_helper (lock&);
+
+ // We restrict ourselves to a single pointer as an argument in hope of
+ // a small object optimization.
+ //
+ static void
+ helper (void*);
+
+ void
+ suspend (atomic_count& task_count);
+
+ void
+ resume (atomic_count& task_count);
+
+ // Task encapsulation.
+ //
+ template <typename F, typename... A>
+ struct task_type
+ {
+ atomic_count* task_count;
+ std::decay_t<F> func;
+ std::tuple<std::decay_t<A>...> args;
+
+ template <size_t... i>
+ void
+ thunk (std::index_sequence<i...>)
+ {
+ move (func) (std::get<i> (move (args))...);
+ }
+ };
+
+ template <typename F, typename... A>
+ static void
+ task_thunk (scheduler& s, lock& ql, void* td)
+ {
+ using task = task_type<F, A...>;
+
+ // Move the data and release the lock.
+ //
+ task t (move (*static_cast<task*> (td)));
+ ql.unlock ();
+
+ t.thunk (std::index_sequence_for<A...> ());
+
+ atomic_count& tc (*t.task_count);
+ if (--tc == 0)
+ s.resume (tc); // Resume a waiter, if any.
+ }
+
+ template <typename T>
+ static std::decay_t<T>
+ decay_copy (T&& x) {return forward<T> (x);}
+
+ private:
+ std::mutex mutex_;
+
+ bool shutdown_ = true; // Shutdown flag.
+ bool task_ = false; // Task queued flag (see below).
+
+ // The constraints that we must maintain:
+ //
+ // active <= max_active
+ // (init_active + helpers) <= max_threads
+ //
+ // Note that the first three are immutable between startup() and
+ // shutdown() so can be accessed without a lock (@@ What about
+ // the case of multiple initially active?).
+ //
+ size_t init_active_ = 0; // Initially active threads.
+ size_t max_active_ = 0; // Maximum number of active threads.
+ size_t max_threads_ = 0; // Maximum number of total threads.
+
+ size_t helpers_ = 0; // Number of helper threads created so far.
+
+ // Every thread that we manage must be accounted for in one of these
+ // counters. And their sum should equal (init_active + helpers).
+ //
+ size_t active_ = 0; // Active master threads executing a task.
+ size_t idle_ = 0; // Idle helper threads waiting for a task.
+ size_t waiting_ = 0; // Suspended master threads waiting for their tasks.
+ size_t ready_ = 0; // Ready master thread waiting to become active.
+ size_t starting_ = 0; // Helper threads starting up.
+
+ std::condition_variable idle_condv_; // Idle helpers queue.
+ std::condition_variable ready_condv_; // Ready masters queue.
+
+ // Statistics counters.
+ //
+ size_t stat_max_waiters_;
+ size_t stat_wait_collisions_;
+
+ // Wait queue.
+ //
+ // A wait slot blocks a bunch of threads. When they are (all) unblocked,
+ // they re-examine their respective conditions and either carry on or
+ // block again.
+ //
+ // The wait queue is a shard of slots. A thread picks a slot based on the
+ // address of its task count variable. How many slots do we need? This
+ // depends on the number of waiters that we can have which cannot be
+ // greater than the total number of threads.
+ //
+ struct wait_slot
+ {
+ std::mutex mutex;
+ std::condition_variable condv;
+ size_t waiters = 0;
+ bool shutdown = true;
+ };
+
+ size_t wait_queue_size_; // Multiple of max_threads.
+ unique_ptr<wait_slot[]> wait_queue_;
+
+ // Task queue.
+ //
+ // Each queue has its own mutex. If the task_ flag above is true then
+ // there *might* be a task in one of the queues. If it is false, then
+ // it means there are either no tasks or someone is busy working on
+ // them.
+ //
+ // For now we only support trivially-destructible tasks.
+ //
+ struct task_data
+ {
+ std::aligned_storage<sizeof (void*) * 7>::type data;
+ void (*thunk) (scheduler&, lock&, void*);
+ };
+
+ // We have two requirements: Firstly, we want to keep the master thread
+ // (the one that called wait()) busy working though its own queue for as
+ // long as possible before (if at all) it "reincarnates" as a helper. The
+ // main reason for this is the limited number of helpers we can create.
+ //
+ // Secondly, we don't want to block wait() longer than necessary since the
+ // master thread can do some work with the result. Plus, overall, we want
+ // to "unwind" task hierarchies as soon as possible since they hold up
+ // resources such as thread's stack. All this means that the master thread
+ // can only work through tasks that it has queued at this "level" of the
+ // async()/wait() calls since we know that wait() cannot return until
+ // they are done.
+ //
+ // To satisfy the first requirement, the master and helper threads get the
+ // tasks from different ends of the queue: master from the back while
+ // helpers from the front. And the master always adds new tasks to the
+ // back.
+ //
+ // To satisfy the second requirement, the master thread stores the index
+ // of the first task it has queued at this "level" and makes sure it
+ // doesn't try to deque any task beyond that.
+ //
+ size_t task_queue_depth_; // Multiple of max_active.
+
+ struct task_queue
+ {
+ std::mutex mutex;
+ bool shutdown = false;
+
+ size_t stat_full = 0; // Number of times pop() returned NULL.
+
+ // head <= stop <= tail
+ //
+ size_t head = 0; // Index of the first element.
+ size_t stop = 0; // Index of the element to stop at in pop_back().
+ size_t tail = 0; // Index of the one past last element.
+
+ unique_ptr<task_data[]> data;
+
+ task_queue (size_t depth): data (new task_data[depth]) {}
+ };
+
+ // Task queue API. Expects the queue mutex to be locked.
+ //
+
+ // Push a new task to the queue returning a pointer to the task data to be
+ // filled or NULL if the queue is full.
+ //
+ task_data*
+ push (task_queue& tq)
+ {
+ return tq.tail != task_queue_depth_ ? &tq.data[tq.tail++] : nullptr;
+ }
+
+ bool
+ empty_front (task_queue& tq) const {return tq.head == tq.tail;}
+
+ void
+ pop_front (task_queue& tq, lock& ql)
+ {
+ task_data& td (tq.data[tq.head++]);
+
+ if (tq.head == tq.tail)
+ tq.stop = tq.head = tq.tail = 0; // Reset.
+ else if (tq.head > tq.stop)
+ tq.stop = tq.head;
+
+ // The thunk moves the task data to its stack, releases the lock,
+ // and continues to execute the task.
+ //
+ td.thunk (*this, ql, &td.data);
+ }
+
+ bool
+ empty_back (task_queue& tq) const {return tq.stop == tq.tail;}
+
+ void
+ pop_back (task_queue& tq, lock& ql)
+ {
+ task_data& td (tq.data[--tq.tail]);
+
+ if (tq.head == tq.tail)
+ tq.stop = tq.head = tq.tail = 0;
+
+ td.thunk (*this, ql, &td.data);
+ }
+
+ // Each thread has its own queue. Instead of allocating all max_threads of
+ // them up front, we will reserve the space but will only construct queues
+ // on demand.
+ //
+ vector<unique_ptr<task_queue>> task_queues_;
+
+ // TLS cache of each thread's task queue.
+ //
+ thread_local static task_queue* task_queue_;
+
+ task_queue&
+ create_queue ();
+ };
+
+ // Main (and only) scheduler. Started up and shut down in main().
+ //
+ extern scheduler sched;
+}
+
+#endif // BUILD2_SCHEDULER