// file : build2/scheduler -*- C++ -*- // copyright : Copyright (c) 2014-2016 Code Synthesis Ltd // license : MIT; see accompanying LICENSE file #ifndef BUILD2_SCHEDULER #define BUILD2_SCHEDULER #include #include #include #include // aligned_storage, etc #include #include #include namespace build2 { // Scheduler of tasks and threads. Works best for "substantial" tasks (e.g., // running a process), where in comparison thread synchronization overhead // is negligible. // // A thread (called "master") may need to perform several tasks which can be // done in parallel (e.g., update all the prerequisites or run all the // tests). To acomplish this, the master, via a call to async(), can ask the // scheduler to run a task in another thread (called "helper"). If a helper // is available, then the task is executed asynchronously by such helper. // Otherwise, the task is executed synchronously as part of the async() // call. Once the master thread has scheduled all the tasks, it calls wait() // to await for their completion. // // The scheduler makes sure that only a certain number of threads (for // example, the number of available hardware threads) are "active" at any // given time (thus the reason why async() may choose to perform the task // synchronously). When a master thread calls wait(), it is "suspended" // until all its asynchronous tasks are completed (at which point it becomes // "ready"). A suspension of a master results in either another ready master // being "resumed" or another helper thread becoming available. // // On completion of a task a helper thread returns to the scheduler which // can again lead either to a ready master being resumed (in which case the // helper is suspended) or the helper becoming available to perform another // task. // // Note that suspended threads are not reused as helpers. Rather, a new // helper thread is always created if none is available. This is done to // allow a ready master to continue as soon as possible. If it were reused // as a helper, then it could be blocked on a nested wait() further down the // stack. This means that the number of threads created by the scheduler // will normally exceed the maximum active allowed. // class scheduler { public: using atomic_count = std::atomic; // F should return void and not throw any exceptions. The way the result // of a task is communicated back to the master thread is ad hoc, usually // via "out" arguments. Such result(s) can only be retrieved by the master // once its task count reaches 0. // // The argument passing semantics is the same as for std::thread. In // particular, lvalue-references are passed as copies (use ref()/cref() // for the by-reference semantics). // // If the scheduler is shutdown, throw system_error(ECANCELED). // template void async (atomic_count& task_count, F&&, A&&...); // Wait until the task count reaches 0. If the scheduler is shutdown // while waiting, throw system_error(ECANCELED). // void wait (atomic_count& task_count); // Startup and shutdown. // public: // Unless already shut down, call shutdown() but ignore errors. // ~scheduler (); // Create a shut down scheduler. // scheduler () = default; // Create a started up scheduler. // // The initial active argument is the number of threads to assume are // already active (e.g., the calling thread). It must not be 0 (since // someone has to schedule the first task). // // If the maximum threads or task queue depth arguments are unspecified, // then appropriate defaults are used. // scheduler (size_t max_active, size_t init_active = 1, size_t max_threads = 0, size_t queue_depth = 0) { startup (max_active, init_active, max_threads, queue_depth); } // Start the scheduler. // void startup (size_t max_active, size_t init_active = 1, size_t max_threads = 0, size_t queue_depth = 0); // Wait for all the helper threads to terminate. Throw system_error on // failure. Note that the initially active threads are not waited for. // Return scheduling statistics. // struct stat { size_t thread_max_active = 0; // max # of active threads allowed. size_t thread_max_total = 0; // max # of total threads allowed. size_t thread_helpers = 0; // # of helper threads created. size_t thread_max_waiting = 0; // max # of waiters at any time. size_t task_queue_depth = 0; // # of entries in a queue (capacity). size_t task_queue_full = 0; // # of times task queue was full. size_t wait_queue_slots = 0; // # of wait slots (buckets). size_t wait_queue_collisions = 0; // # of times slot had been occupied. }; stat shutdown (); // If initially active thread(s) (besides the one that calls startup()) // exist before the call to startup(), then they must call join() before // executing any tasks. The two common cases where you don't have to call // join are a single active thread that calls startup()/shutdown() or // active thread(s) that are created after startup(). // void join () { assert (task_queue_ = nullptr); // Lock the mutex to make sure the values set in startup() are visible // in this thread. // lock l (mutex_); } // If initially active thread(s) participate in multiple schedulers and/or // sessions (intervals between startup() and shutdown()), then they must // call leave() before joining another scheduler/session. Note that this // applies to the active thread that calls shutdown(). Note that a thread // can only participate in one scheduler at a time. // void leave () { task_queue_ = nullptr; } // Return the number of hardware threads or 0 if unable to determine. // static size_t hardware_concurrency () { return std::thread::hardware_concurrency (); } private: using lock = std::unique_lock; void activate_helper (lock&); void create_helper (lock&); // We restrict ourselves to a single pointer as an argument in hope of // a small object optimization. // static void helper (void*); void suspend (atomic_count& task_count); void resume (atomic_count& task_count); // Task encapsulation. // template struct task_type { using func_type = std::decay_t; using args_type = std::tuple...>; atomic_count* task_count; func_type func; args_type args; template void thunk (std::index_sequence) { move (func) (std::get (move (args))...); } }; template static void task_thunk (scheduler&, lock&, void*); template static std::decay_t decay_copy (T&& x) {return forward (x);} private: std::mutex mutex_; bool shutdown_ = true; // Shutdown flag. bool task_ = false; // Task queued flag (see below). // The constraints that we must maintain: // // active <= max_active // (init_active + helpers) <= max_threads // // Note that the first three are immutable between startup() and // shutdown() so can be accessed without a lock (but see join()). // size_t init_active_ = 0; // Initially active threads. size_t max_active_ = 0; // Maximum number of active threads. size_t max_threads_ = 0; // Maximum number of total threads. size_t helpers_ = 0; // Number of helper threads created so far. // Every thread that we manage must be accounted for in one of these // counters. And their sum should equal (init_active + helpers). // size_t active_ = 0; // Active master threads executing a task. size_t idle_ = 0; // Idle helper threads waiting for a task. size_t waiting_ = 0; // Suspended master threads waiting for their tasks. size_t ready_ = 0; // Ready master thread waiting to become active. size_t starting_ = 0; // Helper threads starting up. std::condition_variable idle_condv_; // Idle helpers queue. std::condition_variable ready_condv_; // Ready masters queue. // Statistics counters. // size_t stat_max_waiters_; size_t stat_wait_collisions_; // Wait queue. // // A wait slot blocks a bunch of threads. When they are (all) unblocked, // they re-examine their respective conditions and either carry on or // block again. // // The wait queue is a shard of slots. A thread picks a slot based on the // address of its task count variable. How many slots do we need? This // depends on the number of waiters that we can have which cannot be // greater than the total number of threads. // struct wait_slot { std::mutex mutex; std::condition_variable condv; size_t waiters = 0; bool shutdown = true; }; size_t wait_queue_size_; // Multiple of max_threads. unique_ptr wait_queue_; // Task queue. // // Each queue has its own mutex. If the task_ flag above is true then // there *might* be a task in one of the queues. If it is false, then // it means there are either no tasks or someone is busy working on // them. // // For now we only support trivially-destructible tasks. // struct task_data { std::aligned_storage::type data; void (*thunk) (scheduler&, lock&, void*); }; // We have two requirements: Firstly, we want to keep the master thread // (the one that called wait()) busy working though its own queue for as // long as possible before (if at all) it "reincarnates" as a helper. The // main reason for this is the limited number of helpers we can create. // // Secondly, we don't want to block wait() longer than necessary since the // master thread can do some work with the result. Plus, overall, we want // to "unwind" task hierarchies as soon as possible since they hold up // resources such as thread's stack. All this means that the master thread // can only work through tasks that it has queued at this "level" of the // async()/wait() calls since we know that wait() cannot return until // they are done. // // To satisfy the first requirement, the master and helper threads get the // tasks from different ends of the queue: master from the back while // helpers from the front. And the master always adds new tasks to the // back. // // To satisfy the second requirement, the master thread stores the index // of the first task it has queued at this "level" and makes sure it // doesn't try to deque any task beyond that. // size_t task_queue_depth_; // Multiple of max_active. struct task_queue { std::mutex mutex; bool shutdown = false; size_t stat_full = 0; // Number of times pop() returned NULL. // Our task queue is circular with head being the index of the first // element and tail -- of the last. Since this makes the empty and one // element cases indistinguishable, we also keep the size. // // The mark is an index somewhere between (figuratively speaking) head // and tail, if enabled. If the mark is hit, then it is disabled until // the queue becomes empty. // size_t head = 0; size_t mark = 0; size_t tail = 0; size_t size = 0; unique_ptr data; task_queue (size_t depth): data (new task_data[depth]) {} }; // Task queue API. Expects the queue mutex to be locked. // // Push a new task to the queue returning a pointer to the task data to be // filled or NULL if the queue is full. // task_data* push (task_queue& tq) { size_t& s (tq.size); size_t& t (tq.tail); if (s != task_queue_depth_) { // normal wrap empty // | | | t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t; s++; return &tq.data[t]; } return nullptr; } bool empty_front (task_queue& tq) const {return tq.size == 0;} void pop_front (task_queue& tq, lock& ql) { size_t& s (tq.size); size_t& h (tq.head); size_t& m (tq.mark); bool a (h == m); // Adjust mark? task_data& td (tq.data[h]); // normal wrap empty // | | | h = s != 1 ? (h != task_queue_depth_ - 1 ? h + 1 : 0) : h; if (--s == 0 || a) m = h; // Reset or adjust the mark. // The thunk moves the task data to its stack, releases the lock, // and continues to execute the task. // td.thunk (*this, ql, &td.data); ql.lock (); } bool empty_back (task_queue& tq) const { return tq.size == 0 || tq.mark == task_queue_depth_; } void pop_back (task_queue& tq, lock& ql) { size_t& s (tq.size); size_t& t (tq.tail); size_t& m (tq.mark); bool a (t == m); // Adjust mark? task_data& td (tq.data[t]); // Save the old queue mark and set the new one in case the task we are // about to run adds sub-tasks. // size_t om (m); m = t; // Where next push() will go. // normal wrap empty // | | | t = s != 1 ? (t != 0 ? t - 1 : task_queue_depth_ - 1) : t; --s; td.thunk (*this, ql, &td.data); ql.lock (); // Restore the old mark (which we might have to adjust). // if (s == 0) m = t; // Reset the mark. else if (a) m = task_queue_depth_; // Disable the mark. else // What happens if head goes past the old mark? In this case we will // get into the empty queue state before we end up making any (wrong) // decisions based on this value. Unfortunately there is no way to // detect this (and do some sanity asserts) since things can wrap // around. // // To put it another way, the understanding here is that after the // task returns we will either have an empty queue or there will still // be tasks between the old mark and and the current tail, something // along these lines: // // OOOOOXXXXOOO // | | | // m h t // m = om; } // Each thread has its own queue. Instead of allocating all max_threads of // them up front, we will reserve the space but will only construct queues // on demand. // vector> task_queues_; // TLS cache of thread's task queue. // static #ifdef BUTL_CXX11_THREAD_LOCAL thread_local #else __thread #endif task_queue* task_queue_; task_queue& create_queue (); }; // Main (and only) scheduler. Started up and shut down in main(). // extern scheduler sched; } #include #endif // BUILD2_SCHEDULER