From ea22643b2217921df74ea14df47d7c83987d5761 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Fri, 9 Dec 2016 17:29:27 +0200 Subject: Initial parallel scheduler implementation, use to run testscrips --- build2/scheduler | 453 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 453 insertions(+) create mode 100644 build2/scheduler (limited to 'build2/scheduler') diff --git a/build2/scheduler b/build2/scheduler new file mode 100644 index 0000000..99e9524 --- /dev/null +++ b/build2/scheduler @@ -0,0 +1,453 @@ +// file : build2/scheduler -*- C++ -*- +// copyright : Copyright (c) 2014-2016 Code Synthesis Ltd +// license : MIT; see accompanying LICENSE file + +#ifndef BUILD2_SCHEDULER +#define BUILD2_SCHEDULER + +#include +#include +#include +#include +#include +#include // aligned_storage, etc +#include + +#include +#include + +namespace build2 +{ + // Scheduler of tasks and threads. Works best for "substantial" tasks (e.g., + // running a process), where in comparison thread synchronization overhead + // is negligible. + // + // A thread (called "master") may need to perform several tasks which can be + // done in parallel (e.g., update all the prerequisites or run all the + // tests). To acomplish this, the master, via a call to async(), can ask the + // scheduler to run a task in another thread (called "helper"). If a helper + // is available, then the task is executed asynchronously by such helper. + // Otherwise, the task is executed synchronously as part of the async() + // call. Once the master thread has scheduled all the tasks, it calls wait() + // to await for their completion. + // + // The scheduler makes sure that only a certain number of threads (for + // example, the number of available hardware threads) are "active" at any + // given time (thus the reason why async() may choose to perform the task + // synchronously). When a master thread calls wait(), it is "suspended" + // until all its asynchronous tasks are completed (at which point it becomes + // "ready"). A suspension of a master results in either another ready master + // being "resumed" or another helper thread becoming available. + // + // On completion of a task a helper thread returns to the scheduler which + // can again lead either to a ready master being resumed (in which case the + // helper is suspended) or the helper becoming available to perform another + // task. + // + // Note that suspended threads are not reused as helpers. Rather, a new + // helper thread is always created if none is available. This is done to + // allow a ready master to continue as soon as possible. If it were reused + // as a helper, then it could be blocked on a nested wait() further down the + // stack. This means that the number of threads created by the scheduler + // will normally exceed the maximum active allowed. It should not, however, + // get excessive provided the tasks are not too deeply nested. + // + // @@ Actually, this is not quite correct: this seems to be a function of + // both depth and width of the task tree. Perhaps we should use a hybrid + // approach: when the number of helper threads reaches a certain limit + // we switch to reusing suspended threads? Also, a thread can rake its + // own and its subtask's queues directly in wait() since wait() won't + // return until they are done. + // + class scheduler + { + public: + using atomic_count = std::atomic; + + // F should return void and not throw any exceptions. The way the result + // of a task is communicated back to the master thread is ad hoc, usually + // via "out" arguments. Such result(s) can only be retrieved by the master + // once its task count reaches 0. + // + // The argument passing semantics is the same as for std::thread. In + // particular, lvalue-references are passed as copies (use ref()/cref() + // for the by-reference semantics). + // + // If the scheduler is shutdown, throw system_error(ECANCELED). + // + template + void + async (atomic_count& task_count, F&& f, A&&... a) + { + using task = task_type; + + static_assert (sizeof (task) <= sizeof (task_data), + "insufficient space"); + + static_assert (std::is_trivially_destructible::value, + "not trivially destructible"); + + // Push into the queue unless we are running serially or the queue is + // full. + // + task_data* td (nullptr); + + if (max_active_ != 1) + { + task_queue& tq (task_queue_ != nullptr + ? *task_queue_ + : create_queue ()); + + lock ql (tq.mutex); + + if (tq.shutdown) + throw system_error (ECANCELED, std::system_category ()); + + if ((td = push (tq)) != nullptr) + { + // Package the task. + // + new (&td->data) task { + &task_count, + decay_copy (forward (f)), + {decay_copy (forward (a))...}}; + + td->thunk = &task_thunk; + } + else + tq.stat_full++; + } + + // If serial/full, then run the task synchronously. In this case + // there is no need to mess with task count. + // + if (td == nullptr) + { + forward (f) (forward (a)...); + return; + } + + // Increment the task count. + // + task_count.fetch_add (1, std::memory_order_release); + + lock l (mutex_); + task_ = true; + + // If there is a spare active thread, wake up (or create) the helper. + // + if (active_ < max_active_) + activate_helper (l); + } + + // Wait until the task count reaches 0. If the scheduler is shutdown + // while waiting, throw system_error(ECANCELED). + // + void + wait (atomic_count& task_count); + + // Startup and shutdown. + // + public: + // Unless already shut down, call shutdown() but ignore errors. + // + ~scheduler (); + + // Create a shut down scheduler. + // + scheduler () = default; + + // Create a started up scheduler. + // + // The initial active argument is the number of threads to assume are + // already active (e.g., the calling thread). It must not be 0 (since + // someone has to schedule the first task). + // + // If maximum threads is unspecified, then a generally appropriate default + // limit it used. + // + scheduler (size_t max_active, + size_t init_active = 1, + size_t max_threads = 0) + { + startup (max_active, init_active, max_threads); + } + + // Note: naturally not thread-safe. + // + void + startup (size_t max_active, + size_t init_active = 1, + size_t max_threads = 0); + + // Wait for all the helper threads to terminate. Throw system_error on + // failure. Note that the initially active threads are not waited for. + // Return scheduling statistics. + // + struct stat + { + size_t thread_max_active = 0; // max # of active threads allowed. + size_t thread_max_total = 0; // max # of total threads allowed. + size_t thread_helpers = 0; // # of helper threads created. + size_t thread_max_waiting = 0; // max # of waiters at any time. + + size_t task_queue_depth = 0; // # of entries in a queue (capacity). + size_t task_queue_full = 0; // # of times task queue was full. + + size_t wait_queue_slots = 0; // # of wait slots (buckets). + size_t wait_queue_collisions = 0; // # of times slot had been occupied. + }; + + stat + shutdown (); + + // Return the number of hardware threads or 0 if unable to determine. + // + static size_t + hardware_concurrency () + { + return std::thread::hardware_concurrency (); + } + + private: + using lock = std::unique_lock; + + void + activate_helper (lock&); + + void + create_helper (lock&); + + // We restrict ourselves to a single pointer as an argument in hope of + // a small object optimization. + // + static void + helper (void*); + + void + suspend (atomic_count& task_count); + + void + resume (atomic_count& task_count); + + // Task encapsulation. + // + template + struct task_type + { + atomic_count* task_count; + std::decay_t func; + std::tuple...> args; + + template + void + thunk (std::index_sequence) + { + move (func) (std::get (move (args))...); + } + }; + + template + static void + task_thunk (scheduler& s, lock& ql, void* td) + { + using task = task_type; + + // Move the data and release the lock. + // + task t (move (*static_cast (td))); + ql.unlock (); + + t.thunk (std::index_sequence_for ()); + + atomic_count& tc (*t.task_count); + if (--tc == 0) + s.resume (tc); // Resume a waiter, if any. + } + + template + static std::decay_t + decay_copy (T&& x) {return forward (x);} + + private: + std::mutex mutex_; + + bool shutdown_ = true; // Shutdown flag. + bool task_ = false; // Task queued flag (see below). + + // The constraints that we must maintain: + // + // active <= max_active + // (init_active + helpers) <= max_threads + // + // Note that the first three are immutable between startup() and + // shutdown() so can be accessed without a lock (@@ What about + // the case of multiple initially active?). + // + size_t init_active_ = 0; // Initially active threads. + size_t max_active_ = 0; // Maximum number of active threads. + size_t max_threads_ = 0; // Maximum number of total threads. + + size_t helpers_ = 0; // Number of helper threads created so far. + + // Every thread that we manage must be accounted for in one of these + // counters. And their sum should equal (init_active + helpers). + // + size_t active_ = 0; // Active master threads executing a task. + size_t idle_ = 0; // Idle helper threads waiting for a task. + size_t waiting_ = 0; // Suspended master threads waiting for their tasks. + size_t ready_ = 0; // Ready master thread waiting to become active. + size_t starting_ = 0; // Helper threads starting up. + + std::condition_variable idle_condv_; // Idle helpers queue. + std::condition_variable ready_condv_; // Ready masters queue. + + // Statistics counters. + // + size_t stat_max_waiters_; + size_t stat_wait_collisions_; + + // Wait queue. + // + // A wait slot blocks a bunch of threads. When they are (all) unblocked, + // they re-examine their respective conditions and either carry on or + // block again. + // + // The wait queue is a shard of slots. A thread picks a slot based on the + // address of its task count variable. How many slots do we need? This + // depends on the number of waiters that we can have which cannot be + // greater than the total number of threads. + // + struct wait_slot + { + std::mutex mutex; + std::condition_variable condv; + size_t waiters = 0; + bool shutdown = true; + }; + + size_t wait_queue_size_; // Multiple of max_threads. + unique_ptr wait_queue_; + + // Task queue. + // + // Each queue has its own mutex. If the task_ flag above is true then + // there *might* be a task in one of the queues. If it is false, then + // it means there are either no tasks or someone is busy working on + // them. + // + // For now we only support trivially-destructible tasks. + // + struct task_data + { + std::aligned_storage::type data; + void (*thunk) (scheduler&, lock&, void*); + }; + + // We have two requirements: Firstly, we want to keep the master thread + // (the one that called wait()) busy working though its own queue for as + // long as possible before (if at all) it "reincarnates" as a helper. The + // main reason for this is the limited number of helpers we can create. + // + // Secondly, we don't want to block wait() longer than necessary since the + // master thread can do some work with the result. Plus, overall, we want + // to "unwind" task hierarchies as soon as possible since they hold up + // resources such as thread's stack. All this means that the master thread + // can only work through tasks that it has queued at this "level" of the + // async()/wait() calls since we know that wait() cannot return until + // they are done. + // + // To satisfy the first requirement, the master and helper threads get the + // tasks from different ends of the queue: master from the back while + // helpers from the front. And the master always adds new tasks to the + // back. + // + // To satisfy the second requirement, the master thread stores the index + // of the first task it has queued at this "level" and makes sure it + // doesn't try to deque any task beyond that. + // + size_t task_queue_depth_; // Multiple of max_active. + + struct task_queue + { + std::mutex mutex; + bool shutdown = false; + + size_t stat_full = 0; // Number of times pop() returned NULL. + + // head <= stop <= tail + // + size_t head = 0; // Index of the first element. + size_t stop = 0; // Index of the element to stop at in pop_back(). + size_t tail = 0; // Index of the one past last element. + + unique_ptr data; + + task_queue (size_t depth): data (new task_data[depth]) {} + }; + + // Task queue API. Expects the queue mutex to be locked. + // + + // Push a new task to the queue returning a pointer to the task data to be + // filled or NULL if the queue is full. + // + task_data* + push (task_queue& tq) + { + return tq.tail != task_queue_depth_ ? &tq.data[tq.tail++] : nullptr; + } + + bool + empty_front (task_queue& tq) const {return tq.head == tq.tail;} + + void + pop_front (task_queue& tq, lock& ql) + { + task_data& td (tq.data[tq.head++]); + + if (tq.head == tq.tail) + tq.stop = tq.head = tq.tail = 0; // Reset. + else if (tq.head > tq.stop) + tq.stop = tq.head; + + // The thunk moves the task data to its stack, releases the lock, + // and continues to execute the task. + // + td.thunk (*this, ql, &td.data); + } + + bool + empty_back (task_queue& tq) const {return tq.stop == tq.tail;} + + void + pop_back (task_queue& tq, lock& ql) + { + task_data& td (tq.data[--tq.tail]); + + if (tq.head == tq.tail) + tq.stop = tq.head = tq.tail = 0; + + td.thunk (*this, ql, &td.data); + } + + // Each thread has its own queue. Instead of allocating all max_threads of + // them up front, we will reserve the space but will only construct queues + // on demand. + // + vector> task_queues_; + + // TLS cache of each thread's task queue. + // + thread_local static task_queue* task_queue_; + + task_queue& + create_queue (); + }; + + // Main (and only) scheduler. Started up and shut down in main(). + // + extern scheduler sched; +} + +#endif // BUILD2_SCHEDULER -- cgit v1.1